舊版本的API提供 StreamXmlRecordReader 類來方便我們將特定tag的區段資料傳入給mapper來處理, 新版的則沒看見, 但一樣可以透過 xml parser 類來自行處裡. 使用方式主要在於參數的指定. 使用還有一個需要注意的是 StreamXmlRecordReader 解析後的資料, 以 key 放入整個區段資料.
from http://lancefox.iteye.com/blog/1838595
public class XmlMapReduce { public static void main(String[] args) throws Exception { JobConf conf = new JobConf(XmlMapReduce.class); conf.setJobName("xml read"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(MyMapper.class); conf.setReducerClass(MyReducer.class); conf.setInputFormat(StreamInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); MultipleInputs.addInputPath(conf, new Path("/xmlin"), StreamInputFormat.class, MyMapper.class); FileOutputFormat.setOutputPath(conf, new Path("/out")); conf.set("stream.recordreader.class", StreamXmlRecordReader.class.getName()); //定義每次要傳遞給 map 的資料起始與結束的標籤 conf.set("stream.recordreader.begin", "<property>"); conf.set("stream.recordreader.end", "</property>"); JobClient.runJob(conf); } public static class MyMapper extends MapReduceBase implements Mapper<Text, Text, Text, Text> { @Override public void map(Text key, Text value, OutputCollector<Text, Text> ctx, Reporter r) throws IOException { System.out.println("map:" + key.toString()); //注意 整個區段資料當 key傳入map, value則是空白 ctx.collect(key, key); } } public static class MyReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterator<Text> value, OutputCollector<Text, Text> ctx, Reporter r) throws IOException { StringBuffer sb = new StringBuffer(); while (value.hasNext()) { Text v = value.next(); System.out.println("reduce:" + v.toString()); sb.append(v.toString()); } ctx.collect(new Text(key.getLength() + ""), new Text(sb.toString())); } } }
from http://lancefox.iteye.com/blog/1838595