扩展 SequenceFileInputFormat 以包含文件名+偏移量

2022-01-14 00:00:00 hadoop mapreduce java sequencefile

我希望能够创建一个自定义 InputFormat 来读取序列文件,但另外公开文件路径和该文件中记录所在的偏移量.

I would like to be able to create a custom InputFormat that reads sequence files, but additionally exposes the file path and offset within that file where the record is located.

退一步说,这里是用例:我有一个包含可变大小数据的序列文件.键大多是不相关的,值最多为几兆字节,包含各种不同的字段.我想在 elasticsearch 中索引其中一些字段以及文件名和偏移量.这样我就可以从elasticsearch中查询到那些字段,然后使用文件名和偏移量回到sequence文件中获取原始记录,而不是整个存储在ES中.

To take a step back, here's the use case: I have a sequence file containing variably-sized data. The keys are mostly irrelevant, and the values are up to a couple megabytes containing a variety of different fields. I would like to index some of these fields in elasticsearch along with the file name and offset. This way, I can query those fields from elasticsearch, and then use the file name and offset to go back to the sequence file and obtain the original record, instead of storing the whole thing in ES.

我将整个过程作为一个单独的 java 程序运行.SequenceFile.Reader 类方便地提供了 getPositionseek 方法来实现这一点.

I have this whole process working as a single java program. The SequenceFile.Reader class conveniently gives getPosition and seek methods to make this happen.

但是,最终会涉及到许多 TB 的数据,因此我需要将其转换为 MapReduce 作业(可能仅限 Map).由于序列文件中的实际键无关紧要,我希望采用的方法是创建一个自定义 InputFormat 扩展或以某种方式利用 SquenceFileInputFormat,但不是返回实际键,而是返回由文件组成的复合键和偏移量.

However, there will eventually be many terabytes of data involved, so I will need to convert this to a MapReduce job (probably Map-only). Since the actual keys in the sequence file are irrelevant, the approach I had hoped to take would be to create a custom InputFormat that extends or somehow utilizes the SquenceFileInputFormat, but instead of returning the actual keys, instead returns a composite key consisting of the file and offset.

但是,事实证明这在实践中更加困难.看起来应该是可能的,但考虑到实际的 API 和暴露的内容,这很棘手.有任何想法吗?也许我应该采取另一种方法?

However, that's proving to be more difficult in practice. It seems like it should be possible, but given the actual APIs and what's exposed, it's tricky. Any ideas? Maybe an alternative approach I should take?

推荐答案

如果有人遇到类似的问题,这里是我想出的解决方案.我最终只是简单地复制了 SequenceFileInputFormat/RecordReader 中的一些代码并进行了修改.我曾希望编写子类或装饰器或其他东西......这种方式并不漂亮,但它有效:

In case anyone encounters a similar problem, here's the solution I came up with. I ended up simply duplicating some of the code in SequenceFileInputFormat/RecordReader and just modifying it. I had hoped to write either a subclass or a decorator or something... this way is not pretty, but it works:

SequenceFileOffsetInputFormat.java:

SequenceFileOffsetInputFormat.java:

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> {

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> {

        private SequenceFile.Reader in;
        private long start;
        private long end;
        private boolean more = true;
        private PathOffsetWritable key = null;
        private Writable k = null;
        private V value = null;
        private Configuration conf;

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) split;
            conf = context.getConfiguration();
            Path path = fileSplit.getPath();
            FileSystem fs = path.getFileSystem(conf);
            this.in = new SequenceFile.Reader(fs, path, conf);
            try {
                this.k = (Writable) in.getKeyClass().newInstance();
                this.value = (V) in.getValueClass().newInstance();
            } catch (InstantiationException e) {
                throw new IOException(e);
            } catch (IllegalAccessException e) {
                throw new IOException(e);
            }
            this.end = fileSplit.getStart() + fileSplit.getLength();

            if (fileSplit.getStart() > in.getPosition()) {
                in.sync(fileSplit.getStart());
            }

            this.start = in.getPosition();
            more = start < end;

            key = new PathOffsetWritable(path, start);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!more) {
                return false;
            }
            long pos = in.getPosition();

            more = in.next(k, value);
            if (!more || (pos >= end && in.syncSeen())) {
                key = null;
                value = null;
                more = false;
            } else {
                key.setOffset(pos);
            }
            return more;
        }

        @Override
        public PathOffsetWritable getCurrentKey() {
            return key;
        }

        @Override
        public V getCurrentValue() {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (end == start) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
            }
        }

        @Override
        public void close() throws IOException {
            in.close();
        }

    }

    @Override
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new SequenceFileOffsetRecordReader<V>();
    }

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {
        return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context);
    }

    @Override
    public long getFormatMinSplitSize() {
        return SequenceFile.SYNC_INTERVAL;
    }


}

PathOffsetWritable.java:

PathOffsetWritable.java:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> {

    private Text t = new Text();
    private Path path;
    private long offset;

    public PathOffsetWritable(Path path, long offset) {
        this.path = path;
        this.offset = offset;
    }

    public Path getPath() {
        return path;
    }

    public long getOffset() {
        return offset;
    }

    public void setPath(Path path) {
        this.path = path;
    }

    public void setOffset(long offset) {
        this.offset = offset;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        t.readFields(in);
        path = new Path(t.toString());
        offset = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        t.set(path.toString());
        t.write(out);
        out.writeLong(offset);
    }

    @Override
    public int compareTo(PathOffsetWritable o) {
        int x = path.compareTo(o.path);
        if (x != 0) {
            return x;
        } else {
            return Long.valueOf(offset).compareTo(Long.valueOf(o.offset));
        }
    }


}

相关文章