CombineFileInputFormat Hadoop 0.20.205 的实现

2022-01-13 00:00:00 mapreduce java inputformatter

有人可以指出我在哪里可以找到 CombineFileInputFormat 的实现(组织.使用 Hadoop 0.20.205?这是使用 EMR 从非常小的日志文件(行中的文本)创建大拆分.

Can someone please point out where I could find an implementation for CombineFileInputFormat (org. using Hadoop 0.20.205? this is to create large splits from very small log files (text in lines) using EMR.

令人惊讶的是,Hadoop 没有专门为此目的创建的此类的默认实现,并且在谷歌上搜索看起来好像我不是唯一对此感到困惑的人.我需要编译该类并将其捆绑在一个 jar 中用于 hadoop-streaming,但对 Java 的了解有限,这是一个挑战.

It is surprising that Hadoop does not have a default implementation for this class made specifically for this purpose and googling it looks like I'm not the only one confused by this. I need to compile the class and bundle it in a jar for hadoop-streaming, with a limited knowledge of Java this is some challenge.

我已经尝试过使用必要导入的 Yetitrails 示例,但我得到下一个方法的编译器错误.

I already tried the yetitrails example, with the necessary imports but I get a compiler error for the next method.

推荐答案

这是我为你准备的一个实现:

Here is an implementation I have for you:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

        return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
    }

    public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
        private final LineRecordReader linerecord;

        public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
            FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
            linerecord = new LineRecordReader(conf, filesplit);
        }

        @Override
        public void close() throws IOException {
            linerecord.close();

        }

        @Override
        public LongWritable createKey() {
            // TODO Auto-generated method stub
            return linerecord.createKey();
        }

        @Override
        public Text createValue() {
            // TODO Auto-generated method stub
            return linerecord.createValue();
        }

        @Override
        public long getPos() throws IOException {
            // TODO Auto-generated method stub
            return linerecord.getPos();
        }

        @Override
        public float getProgress() throws IOException {
            // TODO Auto-generated method stub
            return linerecord.getProgress();
        }

        @Override
        public boolean next(LongWritable key, Text value) throws IOException {

            // TODO Auto-generated method stub
            return linerecord.next(key, value);
        }

    }
}

在您的工作中,首先根据您希望将输入文件组合成的大小设置参数 mapred.max.split.size.在您的 run() 中执行以下操作:

In your job first set the parameter mapred.max.split.size according to the size you would like the input files to be combined into. Do something like follows in your run():

...
            if (argument != null) {
                conf.set("mapred.max.split.size", argument);
            } else {
                conf.set("mapred.max.split.size", "134217728"); // 128 MB
            }
...

            conf.setInputFormat(CombinedInputFormat.class);
...

相关文章