MapReduce 過濾輸入檔檔

1 篇文章 / 0 new
author
MapReduce 過濾輸入檔檔
hadoop提供 PathFilter 介面來提供檔案篩選方式, 將實作的類別加入到 driver Job 內即可
FileInputFormat.setInputPathFilter(job, MaxPathFilter.class);
PathFilter 實作
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
 
public class MaxPathFilter extends Configured implements PathFilter {    
    Configuration conf;
    FileSystem fs;
    String namePattern,datePattern;
    @Override
    public boolean accept(Path path) {//實作方法
        try {
            if (fs.getFileStatus(path).isDir()) {//目錄不比對
                return true;
            } else {
                boolean fileDate = matchFileDate(path);
                boolean fileName = matchFileName(path);
                System.out.printf("比對 檔案:%s, 時間:%s\n", fileName, fileDate);
                return (fileDate && fileName);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }    
    @Override
    public void setConf(Configuration conf) {
        System.out.println("** setConf "+conf);
        if (conf != null) {
            this.conf = conf;
            namePattern = conf.get("file.name");//-D file.name=.*.txt
            datePattern = conf.get("file.date");//-D file.date=10 10日內資料
            System.out.printf("比對樣式 檔案:%s, 時間:%s\n",namePattern,datePattern);
            try {
                this.fs = FileSystem.get(this.conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    //檔名過濾
    private boolean matchFileName(Path path) {
        if (namePattern == null) {
            return true;//不限檔名
        } else {
            Pattern pattern = Pattern.compile(namePattern);
            Matcher m = pattern.matcher(path.toString());
            boolean result = m.matches();
            System.out.println("輸入 :" + path.toString() + " 匹配:" + result);
            return result;
        }
    }
    //檔案時間過濾
    private boolean matchFileDate(Path path) {
        if (datePattern == null) {
            return true;//不限日期
        } else {
            int mTime = 0;
            mTime = Integer.valueOf(datePattern.substring(0, datePattern.length()));
            try {
                FileStatus file = fs.getFileStatus(path);
                long now = System.currentTimeMillis() / 86400000;
                long time = file.getModificationTime() / 86400000;
                long lastModifTime = now - time;
                boolean accept = mTime > lastModifTime ? true : false;
                System.out.printf("檔案:%s(異動:%d日前) 是否符合%d日內:%s\n"
                        ,path.toString(), lastModifTime, mTime, accept);
                return accept;
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
        }
    }
}
Free Web Hosting