使用 boost::iostreams::mapped_file_source 和 std::multimap

2021-12-24 00:00:00 c++ boost memory-mapped-files multimap

我有大量数据要分析 - 每个文件大约 5gigs.每个文件的格式如下:

I have a rather large amount of data to analyse - each file is about 5gigs. Each file is of the following format:

xxxxx yyyyy

键和值都可以重复,但键是按升序排列的.我正在尝试为此使用内存映射文件,然后找到所需的键并使用它们.这是我写的:

Both key and value can repeat, but the keys are sorted in increasing order. I'm trying to use a memory mapped file for this purpose and then find the required keys and work with them. This is what I've written:

if (data_file != "")
{
    clock_start = clock();
    data_file_mapped.open(data_file);

    data_multimap = (std::multimap<double, unsigned int> *)data_file_mapped.data();
    if (data_multimap != NULL)
    {
        std::multimap<double, unsigned int>::iterator it = data_multimap->find(keys_to_find[4]);
        if (it != data_multimap->end())
        {
            std::cout << "Element found.";
            for (std::multimap<double, unsigned int>::iterator it = data_multimap->lower_bound(keys_to_find[4]); it != data_multimap->upper_bound(keys_to_find[5]); ++it)
            {
                std::cout << it->second;
            }
            std::cout << "
";
            clock_end = clock();

            std::cout << "Time taken to read in the file: " << (clock_end - clock_start)/CLOCKS_PER_SEC << "
";
        }
        else
            std::cerr << "Element not found at all" << "
";
    }
    else
        std::cerr << "Nope - no data received."<< "
";
}

基本上,我需要找到键的范围并将这些块拉出来进行处理.我第一次尝试在多图上使用方法时出现段错误.例如,当 find 方法被调用时.我也尝试了upper_boundlower_bound 和其他方法,但仍然出现段错误.

Basically, I need to locate ranges of keys and pull those chunks out to work on. I get a segfault the first time I try to use a method on the multimap. For example, when the find method is called. I tried the upper_bound, lower_bound and other methods too, and still get a segfault.

这就是 gdb 给我的:

Program received signal SIGSEGV, Segmentation fault.
_M_lower_bound (this=<optimized out>, __k=<optimized out>, __y=<optimized out>, __x=0xa31202030303833) at /usr/include/c++/4.9.2/bits/stl_tree.h:1261
1261            if (!_M_impl._M_key_compare(_S_key(__x), __k))

有人可以指出我做错了什么吗?我只能在内存映射文件上找到简单的例子――还没有这样的例子.谢谢.

Could someone please point out what I'm doing wrong? I've only been able to find simplistic examples on memory mapped files - nothing like this yet. Thanks.

更多信息:

我上面描述的文件基本上是一个两列纯文本文件,神经模拟器将其作为模拟的输出.就这么简单:

The file I described above is basically a two column plain text file which a neural simulator gives me as the output of my simulations. It's simple like this:

$ du -hsc 201501271755.e.ras 
4.9G    201501271755.e.ras
4.9G    total
$ head 201501271755.e.ras 
0.013800  0
0.013800  1
0.013800  10
0.013800  11
0.013800  12
0.013800  13
0.013800  14
0.013800  15
0.013800  16
0.013800  17

第一列是时间,第二列是此时发射的神经元 - (这是一个尖峰时间光栅文件).实际上,输出是用于运行模拟的每个 MPI 等级的类似文件.使用 sort -g -m 将各种文件组合到该主文件中.有关文件格式的更多信息,请访问:http://www.fzenke.net/auryn/doku.php?id=manual:ras

The first column is time, the second column is the neurons that fired at this time - (it's a spike time raster file). Actually, the output is a file like this from each MPI rank that is being used to run the simulation. The various files have been combined to this master file using sort -g -m. More information on the file format is here: http://www.fzenke.net/auryn/doku.php?id=manual:ras

为了在模拟的特定时间计算神经元集的放电率和其他指标,我需要 - 在文件中定位时间,在 [time -1,time] 之间拉出一个块并运行一些指标和在这个块上等等.这个文件非常小,随着我的模拟变得越来越复杂并且运行时间更长,我预计它的大小会增加很多.这就是我开始研究内存映射文件的原因.我希望能在某种程度上澄清问题陈述.我只需要读取输出文件来处理它包含的信息.我根本不需要修改这个文件.

To calculate the firing rate and other metrics of the neuron set at certain times of the simulation, I need to - locate the time in the file, pull out a chunk between [time -1,time] and run some metrics and so on on this chunk. This file is quite small and I expect the size to increase quite a bit as my simulations get more and more complicated and run for longer time periods. It's why I began looking into memory mapped files. I hope that clarifies the problem statement somewhat. I only need to read the output file to process the information it contains. I do not need to modify this file at all.

为了处理数据,我将再次使用多个线程,但由于我在地图上的所有操作都是只读的,我不希望在那里遇到麻烦.

To process the data, I'll use multiple threads again, but since all my operations on the map are read-only, I don't expect to run into trouble there.

推荐答案

多个地图在内存中不是按顺序排列的.(它们是基于节点的容器,但我离题了).事实上,即使是这样,布局与文本输入匹配的可能性也很小.

Multi maps aren't laid out sequentially in memory. (They're node-based containers, but I digress). In fact, even if they were, chances would be slim that the layout would match that of the text input.

基本上有两种方法可以完成这项工作:

There's basically two ways you can make this work:

  1. 继续使用 multimap 但使用自定义分配器(以便所有分配都在映射的内存区域中完成).这是最好的"从高级 C++ 的角度来看,/但是/您需要将文件更改为二进制格式.
  1. Keep using the multimap but use a custom allocator (so that all allocations are done in the mapped memory region). This is the "nicest" from a high-level C++ viewpoint, /but/ you will need to change to a binary format of your file.

如果可以,这就是我的建议.Boost Container + Boost Interprocess 拥有让这一切变得相对轻松所需的一切.

If you can, this is what I'd suggest. Boost Container + Boost Interprocess have everything you need to make this relatively painless.

  1. 您编写了一个自定义容器抽象"直接作用于映射数据.你可以

  1. You write a custom container "abstraction" that works directly on the mapped data. You could either

  • 识别xxxx yyyy"从任何地方配对(行结束?)或
  • 建立文件中所有行开始的索引.

使用这些,您可以设计一个可用于实现更高级别操作(lower_boundupper_boundequal_range).

Using these you can devise an interator (Boost Iterator iterator_facade) that you can use to implement higher level operations (lower_bound, upper_bound and equal_range).

一旦你有了这些,你基本上就可以把这个内存映射作为只读的键值数据库来查询了.

Once you have these, you're basically all set to query this memory map as a readonly key-value database.

遗憾的是,如果您还想支持变异操作(insertremove),这种内存表示对性能非常.

Sadly, this kind of memory representation would be extremely bad for performance if you also want to support mutating operations (insert, remove).

如果您有文件的实际样本,我可以对所描述的任何一种方法进行演示.

If you have an actual sample of the file, I could do a demonstration of either of the approaches described.

快速示例:

  1. 使用 boost::interprocess,您可以(非常)简单地定义您想要的多图:

  1. With boost::interprocess you can (very) simply define the multimap you desire:

namespace shared {
    namespace bc = boost::container;

    template <typename T> using allocator = bip::allocator<T, bip::managed_mapped_file::segment_manager>;
    template <typename K, typename V>
        using multi_map = bc::flat_multimap<
            K, V, std::less<K>, 
            allocator<typename bc::flat_multimap<K, V>::value_type> >;
}

注意事项:

  • 我选择了 flatmap(实际上是 flat_multimap)因为它可能更多存储效率高,与第二种方法更具可比性(下面给出);

  • I chose flatmap (flat_multimap, actually) because it is likely more storage efficient, and is much more comparable to the second approach (given below);

注意这个选择会影响迭代器/引用的稳定性,并且会非常喜欢只读操作.如果你需要迭代器稳定性和/或许多变异操作,使用常规 map(或用于hash_map) 而不是 flat 变体.

Note that this choice affects iterator/reference stability and will favours read-only operations pretty heavily. If you need iterator stability and/or many mutating operations, use a regular map (or for very high volumes a hash_map) instead of the flat variations.

我为这个演示选择了一个 managed_mapped_file 段(这样你就可以获得持久性).该演示展示了 10G 如何被稀疏地预分配,但只有实际分配的空间在磁盘上使用.您同样可以使用 managed_shared_memory.

I chose a managed_mapped_file segment for this demonstration (so you get persistence). The demo shows how 10G is sparsely pre-allocated, but only the space actually allocated is used on disk. You could equally well use a managed_shared_memory.

如果你有二进制持久性,你可能完全丢弃文本数据文件.

If you have binary persistence, you might discard the text datafile altogether.

我使用 Boost Spirit 将文本数据从 mapped_file_source 解析为 shared::multi_map.实现是完全通用的.

I parse the text data into a shared::multi_map<double, unsigned> from a mapped_file_source using Boost Spirit. The implementation is fully generic.

不需要写iterator类、start_of_line()end_of_line()lower_bound()upper_bound()equal_range() 或其中任何一个,因为它们在 multi_map 接口中已经是标准的,所以我们只需要编写 main:

There is no need to write iterator classes, start_of_line(), end_of_line(), lower_bound(), upper_bound(), equal_range() or any of those, since they're already standard in the multi_map interface, so all we need to is write main:

生活在 Coliru

#define NDEBUG
#undef DEBUG
#include <boost/iostreams/device/mapped_file.hpp>
#include <boost/fusion/adapted/std_pair.hpp>
#include <boost/container/flat_map.hpp>
#include <boost/interprocess/managed_mapped_file.hpp>
#include <boost/spirit/include/qi.hpp>
#include <iomanip>

namespace bip = boost::interprocess;
namespace qi = boost::spirit::qi;

namespace shared {
    namespace bc = boost::container;

    template <typename T> using allocator = bip::allocator<T, bip::managed_mapped_file::segment_manager>;
    template <typename K, typename V>
        using multi_map = bc::flat_multimap<
            K, V, std::less<K>, 
            allocator<typename bc::flat_multimap<K, V>::value_type> >;
}

#include <iostream>

bip::managed_mapped_file msm(bip::open_or_create, "lookup.bin", 10ul<<30);

template <typename K, typename V>
shared::multi_map<K,V>& get_or_load(const char* fname) {
    using Map = shared::multi_map<K, V>;
    Map* lookup = msm.find_or_construct<Map>("lookup")(msm.get_segment_manager());

    if (lookup->empty()) { 
        // only read input file if not already loaded
        boost::iostreams::mapped_file_source input(fname);
        auto f(input.data()), l(f + input.size());

        bool ok = qi::phrase_parse(f, l,
                (qi::auto_ >> qi::auto_) % qi::eol >> *qi::eol, 
                qi::blank, *lookup);

        if (!ok || (f!=l))
            throw std::runtime_error("Error during parsing at position #" + std::to_string(f - input.data()));
    }

    return *lookup;
}

int main() {
    // parse text file into shared memory binary representation
    auto const& lookup = get_or_load<double, unsigned int>("input.txt");
    auto const e = lookup.end();

    for(auto&& line : lookup)
    {
        std::cout << line.first << "	" << line.second << "
";

        auto er = lookup.equal_range(line.first);

        if (er.first  != e) std::cout << " lower: " << er.first->first  << "	" << er.first->second  << "
";
        if (er.second != e) std::cout << " upper: " << er.second->first << "	" << er.second->second << "
";
    }
}

  • 我完全按照我的描述实现了它:

  • I implemented it exactly as I described:

    • 原始const char*区域映射的简单容器;

    使用 boost::iterator_facade 制作一个迭代器,在解引用时解析文本;

    using boost::iterator_facade to make an iterator that parses the text on dereference;

    用于打印输入行,我使用 boost::string_ref - 这避免了复制字符串的动态分配.

    for printing the input lines I use boost::string_ref - which avoids dynamic allocations for copying strings.

    解析是用灵气完成的:

      if (!qi::phrase_parse(
                  b, _data.end,
                  qi::auto_ >> qi::auto_ >> qi::eoi,
                  qi::space,
                  _data.key, _data.value)) 
    

    选择 Qi 是为了速度和通用性:您可以在实例化时选择 KeyValue 类型:

    Qi was chosen for speed and genericity: you can choose the Key and Value types at instantiation time:

      text_multi_lookup<double, unsigned int> tml(map.data(), map.data() + map.size());
    

  • 我已经实现了 lower_boundupper_boundequal_range 成员函数,它们利用了底层连续存储.即使线"iterator 不是随机访问的而是双向的,我们仍然可以跳转到这样一个迭代器范围的mid_point,因为我们可以从任意位置获取start_of_lineconst char* 到底层映射区域.这使得二分查找更加高效.

  • I've implemented lower_bound, upper_bound and equal_range member functions that take advantage of underlying contiguous storage. Even though the "line" iterator is not random-access but bidirectional, we can still jump to the mid_point of such an iterator range because we can get the start_of_line from any const char* into the underlying mapped region. This make binary searching efficient.

    请注意,此解决方案会在取消引用 iterator 时解析行.如果多次取消引用相同的行,这可能效率不高.

    Note that this solution parses lines on dereference of the iterator. This might not be efficient if the same lines are dereferenced a lot of times.

    但是,对于不频繁的查找,或在输入数据的同一区域中不常见的查找,这与它可能获得的效率差不多(仅执行最低要求的解析和 O(log n) 二进制搜索),同时完全通过映射文件来绕过初始加载时间(无访问意味着不需要加载任何内容).

    But, for infrequent lookups, or lookups that are not typical in the same region of the input data, this is about as efficient as it can possibly get (doing only minimum required parsing and O(log n) binary searching), all the while completely bypassing the initial load time by mapping the file instead (no access means nothing needs to be loaded).

    生活在 Coliru(包括测试数据)

    #define NDEBUG
    #undef DEBUG
    #include <boost/iostreams/device/mapped_file.hpp>
    #include <boost/utility/string_ref.hpp>
    #include <boost/optional.hpp>
    #include <boost/spirit/include/qi.hpp>
    #include <thread>
    #include <iomanip>
    
    namespace io = boost::iostreams;
    namespace qi = boost::spirit::qi;
    
    template <typename Key, typename Value> 
    struct text_multi_lookup {
        text_multi_lookup(char const* begin, char const* end)
            : _map_begin(begin), 
              _map_end(end)
        {
        }
    
      private:
        friend struct iterator;
        enum : char { nl = '
    ' };
    
        using rawit = char const*;
        rawit _map_begin, _map_end;
    
        rawit start_of_line(rawit it) const {
            while (it > _map_begin) if (*--it == nl) return it+1;
            assert(it == _map_begin);
            return it;
        }
    
        rawit end_of_line(rawit it) const {
            while (it < _map_end) if (*it++ == nl) return it;
            assert(it == _map_end);
            return it;
        }
    
      public:
        struct value_type final {
            rawit beg, end;
            Key   key;
            Value value;
    
            boost::string_ref str() const { return { beg, size_t(end-beg) }; }
        };
    
        struct iterator : boost::iterator_facade<iterator, boost::string_ref, boost::bidirectional_traversal_tag, value_type> {
    
            iterator(text_multi_lookup const& d, rawit it) : _region(&d), _data { it, nullptr, Key{}, Value{} } { 
                assert(_data.beg == _region->start_of_line(_data.beg));
            }
    
          private:
            friend text_multi_lookup;
    
            text_multi_lookup const* _region;
            value_type mutable _data;
    
            void ensure_parsed() const {
                if (!_data.end) 
                {
                    assert(_data.beg == _region->start_of_line(_data.beg));
                    auto b = _data.beg;
                    _data.end = _region->end_of_line(_data.beg);
    
                    if (!qi::phrase_parse(
                                b, _data.end,
                                qi::auto_ >> qi::auto_ >> qi::eoi,
                                qi::space,
                                _data.key, _data.value)) 
                    {
                        std::cerr << "Problem in: " << std::string(_data.beg, _data.end) 
                                  << "at:         " << std::setw(_data.end-_data.beg) << std::right << std::string(_data.beg,_data.end);
                        assert(false);
                    }
                }
            }
    
            static iterator mid_point(iterator const& a, iterator const& b) {
                assert(a._region == b._region);
                return { *a._region, a._region->start_of_line(a._data.beg + (b._data.beg -a._data.beg)/2) };
            }
    
          public:
            value_type const& dereference() const {
                ensure_parsed();
                return _data;
            }
    
            bool equal(iterator const& o) const {
                return (_region == o._region) && (_data.beg == o._data.beg);
            }
    
            void increment() {
                _data = { _region->end_of_line(_data.beg), nullptr, Key{}, Value{} };
                assert(_data.beg == _region->start_of_line(_data.beg));
            }
        };
    
        using const_iterator = iterator;
    
        const_iterator begin()  const { return { *this, _map_begin }; }
        const_iterator end()    const { return { *this, _map_end   }; }
        const_iterator cbegin() const { return { *this, _map_begin }; }
        const_iterator cend()   const { return { *this, _map_end   }; }
    
        template <typename CompatibleKey>
        const_iterator lower_bound(CompatibleKey const& key) const {
            auto f(begin()), l(end());
            while (f!=l) {
                auto m = iterator::mid_point(f,l);
    
                if (m->key < key) {
                    f = m;
                    ++f;
                }
                else {
                    l = m;
                }
            }
            return f;
        }
    
        template <typename CompatibleKey>
        const_iterator upper_bound(CompatibleKey const& key) const {
            return upper_bound(key, begin());
        }
    
      private:
        template <typename CompatibleKey>
        const_iterator upper_bound(CompatibleKey const& key, const_iterator f) const {
            auto l(end());
            while (f!=l) {
                auto m = iterator::mid_point(f,l);
    
                if (key < m->key) {
                    l = m;
                }
                else {
                    f = m;
                    ++f;
                }
            }
            return f;
        }
    
      public:
        template <typename CompatibleKey>
        std::pair<const_iterator, const_iterator> equal_range(CompatibleKey const& key) const {
            auto lb = lower_bound(key);
            return { lb, upper_bound(key, lb) };
        }
    
    };
    
    #include <iostream>
    
    int main() {
        io::mapped_file_source map("input.txt");
        text_multi_lookup<double, unsigned int> tml(map.data(), map.data() + map.size());
    
        auto const e = tml.end();
    
        for(auto&& line : tml)
        {
            std::cout << line.str();
    
            auto er = tml.equal_range(line.key);
    
            if (er.first  != e) std::cout << " lower: " << er.first->str();
            if (er.second != e) std::cout << " upper: " << er.second->str();
        }
    }
    

  • 好奇:这是拆解.请注意所有算法内容如何直接内联到 main:http://paste.ubuntu.com/9946135/

    For the curious: here's the disassembly. Note how all the algorithmic stuff is inlined right into main: http://paste.ubuntu.com/9946135/

    相关文章