Java MapReduce 按日期计数

2022-01-13 00:00:00 hadoop mapreduce word-count java

我是 Hadoop 的新手,我正在尝试做一个 MapReduce 程序,以按日期(按月分组)计算 lecters 的最大前两次出现.所以我的输入是这样的:

I'm new to Hadoop, and i'm trying to do a MapReduce program, to count the max first two occurrencise of lecters by date (grouped by month). So my input is of this kind :

2017-06-01 , A, B, A, C, B, E, F 
2017-06-02 , Q, B, Q, F, K, E, F
2017-06-03 , A, B, A, R, T, E, E 
2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

所以,由于这个 MapReducer 程序的结果,我正在努力,比如:

so, i'm expeting as result of this MapReducer program, something like :

2017-06,  A:4, E:4
2017-07,  A:4, B:4

public class ArrayGiulioTest {

    public static Logger logger = Logger.getLogger(ArrayGiulioTest.class);

    public static class CustomMap extends Mapper<LongWritable, Text, Text, TextWritable> {
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            TextWritable array = new TextWritable();
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line, ",");
            String dataAttuale = tokenizer.nextToken().substring(0,
                    line.lastIndexOf("-"));

            Text tmp = null;
            Text[] tmpArray = new Text[tokenizer.countTokens()];
            int i = 0;
            while (tokenizer.hasMoreTokens()) {
                String prod = tokenizer.nextToken(",");

                word.set(dataAttuale);
                tmp = new Text(prod);
                tmpArray[i] = tmp;

                i++;
            }

            array.set(tmpArray);

            context.write(word, array);

        }
    }

    public static class CustomReduce extends Reducer<Text, TextWritable, Text, Text> {


        public void reduce(Text key, Iterator<TextWritable> values,
                Context context) throws IOException, InterruptedException {

            MapWritable map = new MapWritable();
            Text txt = new Text();

            while (values.hasNext()) {
                TextWritable array = values.next();
                Text[] tmpArray = (Text[]) array.toArray();
                for(Text t : tmpArray) {
                    if(map.get(t)!= null) {
                        IntWritable val = (IntWritable) map.get(t);
                        map.put(t, new IntWritable(val.get()+1));
                    } else {
                        map.put(t, new IntWritable(1));
                    }
                }

            }

            Set<Writable> set = map.keySet();
            StringBuffer str = new StringBuffer();
            for(Writable k : set) {

                str.append("key: " + k.toString() + " value: " + map.get(k) + "**");
            }
            txt.set(str.toString());


            context.write(key, txt);
        }
    }

    public static void main(String[] args) throws Exception {
        long inizio = System.currentTimeMillis();
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "countProduct");
        job.setJarByClass(ArrayGiulioTest.class);

        job.setMapperClass(CustomMap.class);
        //job.setCombinerClass(CustomReduce.class);
        job.setReducerClass(CustomReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TextWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
        long fine = System.currentTimeMillis();
        logger.info("**************************************End" + (End-Start));
        System.exit(1);
    }

}

我已经以这种方式实现了我的自定义 TextWritable:

and i've implemented my custom TextWritable in this way :

public class TextWritable extends ArrayWritable {


    public TextWritable() {
        super(Text.class);
    }
}

..所以当我运行我的 MapReduce 程序时,我会得到这种结果

..so when i run my MapReduce program i obtain a result of this kind

2017-6    wordcount.TextWritable@3e960865
2017-6    wordcount.TextWritable@3e960865

很明显,我的减速器不起作用.似乎是我的 Mapper 的输出

it's obvious that my reducer it doesn't works. It seems the output from my Mapper

有什么想法吗?有人可以说是否是解决问题的正确途径?

Any idea? And someone can says if is the right path to the solution?

这里控制台日志(仅供参考,我的输入文件有 6 行而不是 5 行)*我在 eclipse(mono JVM) 下启动 MapReduce 问题或使用带有 Hdfs 的 Hadoop 得到相同的结果

Here Console Log (Just for information, my input file has 6 rows instead of 5) *I obtain the same result starting MapReduce problem under eclipse(mono JVM) or using Hadoop with Hdfs

File System Counters
    FILE: Number of bytes read=1216
    FILE: Number of bytes written=431465
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=6
    Map output records=6
    Map output bytes=214
    Map output materialized bytes=232
    Input split bytes=97
    Combine input records=0
    Combine output records=0
    Reduce input groups=3
    Reduce shuffle bytes=232
    Reduce input records=6
    Reduce output records=6
    Spilled Records=12
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=0
    Total committed heap usage (bytes)=394264576
Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
File Input Format Counters 
    Bytes Read=208
File Output Format Counters 
    Bytes Written=1813

推荐答案

主要问题是reduce方法的符号:

The main problem is about the sign of the reduce method :

我在写:public void reduce(Text key, Iterator<TextWritable> values,上下文上下文)

而不是

    public void reduce(Text key, Iterable<ArrayTextWritable> values,

这就是我获取 Map 输出而不是 Reduce 输出的原因

This is the reason why i obtain my Map output instead of my Reduce otuput

相关文章