使用像关系数据库这样的 boost 多索引
这是我试图模拟的情况:
COL1 Col2 Col3CBT.151.5.T.FEED S1 t1CBT.151.5.T.FEED s2 t2CBT.151.5.T.FEED s3 t3CBT.151.5.T.FEED s4 t4CBT.151.5.T.FEED s5 t1CBT.151.8.T.FEED s7 t1CBT.151.5.Q.FEED s8 t3
COL1 - 是 ID,对于给定的 ID,可以有多个符号.
COL2 - 符号,它们是唯一的
COL3 - 一个符号的更新时间,两个不同的符号可能同时更新,因此它们不是唯一的.
我的目标是获取最活跃的股票代码,比如在过去 60 秒内更新的符号.为此,我使用了 boost 多索引.
头文件:
#ifndef __TICKER_INFO_MANAGER_IMPL__#define __TICKER_INFO_MANAGER_IMPL__#include #include #include #include #include #include #include 命名空间 bmi = boost::multi_index;命名空间 bip = boost::interprocess;结构 id_index{};结构符号索引{};struct last_update_time_index{};结构少{模板<T类,U类>bool operator()(T const& t, U const& u) const {返回 t <你;}};typedef bmi::multi_index_container<股票更新信息,bmi::indexed_by,BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, shm_string, m_symbol), Less>,bmi::ordered_non_unique<bmi::tag<last_update_time_index>, BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, int, m_last_update_time), Less>>,bip::managed_shared_memory::allocator::type>ticker_update_info_set;类tickerInfoMangerImplementation {上市:tickerInfoMangerImplementation( const sharedMemoryNameT & name );bool put_records( const tickerUpdateInfoT & record );int get_active_ticker_count(const thresholdT seconds);无效打印内容();bip::managed_shared_memory&get_managed_memory_segment() {返回 m_managed_memory_segment;}私人的:const sharedMemoryNameT m_name;bip::managed_shared_memory m_managed_memory_segment;ticker_update_info_set *p_ticker_info_set;};#万一
cpp 文件
#include #include #include #include "basic_time.h"使用命名空间 boost::interprocess;tickerInfoMangerImplementation::tickerInfoMangerImplementation(const sharedMemoryNameT & name): m_name(name),m_managed_memory_segment( open_or_create, "test", 65536){p_ticker_info_set = m_managed_memory_segment.find_or_construct("SetOfTickerUpdateInformation")//容器在共享内存中的名字(ticker_update_info_set::ctor_args_list(), m_managed_memory_segment.get_allocator());//ctor参数}bool tickerInfoMangerImplementation::put_records( const tickerUpdateInfoT & record ) {std::pairresult_pair = p_ticker_info_set->insert(record);如果( result_pair.second ) {返回 result_pair.second;}typedef ticker_update_info_set::index::type ticker_update_info_set_by_symbol;ticker_update_info_set_by_symbol &sym_index = (*p_ticker_info_set).get();ticker_update_info_set_by_symbol::iterator it = sym_index.find( record.m_symbol );tickerUpdateInfoT ticker_info = *it;ticker_info.m_last_update_time = record.m_last_update_time;返回 sym_index.replace( it,ticker_info );}int tickerInfoMangerImplementation::calculate_historical_time_using_threshold(const thresholdT seconds ) {basic_time::Secs_t 秒(阈值);basic_time tick_time;tick_time -= 秒;返回 (tick_time.fullTime());}int tickerInfoMangerImplementation::get_active_ticker_count(const thresholdT seconds, std::string key ) {typedef ticker_update_info_set::index::type ticker_update_info_set_by_id;ticker_update_info_set_by_id &id_index = (*p_ticker_info_set).get();int tick_time = calculate_historical_time_using_threshold( seconds );//这里我想找钥匙//基于那个键,我想获取在一定时间后更新的所有符号(使用下限)std::copy( it, time_index.end(), std::ostream_iterator(std::cout));}void tickerInfoMangerImplementation::print_contents() {const ticker_update_info_set::nth_index<1>::type&name_index = (*p_ticker_info_set).get<1>();std::copy( name_index.begin(), name_index.end(), std::ostream_iterator(std::cout));}std::ostream&运算符<<(std::ostream& os, consttickerUpdateInfoT & obj) {操作系统<
我将插入到 boost 多索引中的记录结构
#ifndef __TICKER_INFO__#define __TICKER_INFO__#include #include #include typedef boost::interprocess::managed_shared_memory::allocator::type char_allocator;typedef boost::interprocess::basic_string, char_allocator>shm_string;//要插入共享内存的数据typedef struct tickerUpdateInfo {shm_string m_id;shm_string m_symbol;int m_last_update_time;tickerUpdateInfo( const char * id,const char *symbol,int last_update_time,const char_allocator &a): m_id( id, a), m_symbol( 符号, a), m_last_update_time( last_update_time) {}股票更新信息&运营商=(const tickerUpdateInfo&其他){如果(这个!= &other){m_last_update_time = other.m_last_update_time;}返回 *this;}} 代码更新信息;#万一
现在在函数 get_active_ticker_count() 中我想指定像 CBT.151.5.T.FEED 这样的键,它应该返回:
S1 t1s2 t2s3 t3s4 t4s5 t1
让我们假设 t1 > t2 > t3 > t4,那么我想找出时间大于 t3 的集合,并且还想找到这些符号的计数.我该如何进行同样的操作,我已经能够插入,但我在检索部分卡住了.请帮忙!
解决方案我已将您的(极其复杂的1)模型简化为:
enum TimePoints {//让我们假设 t1 >t2>t3 >t4t1 = 100,t2 = 80,t3 = 70,t4 = 20,};使用 IdType = std::string;使用 Symbol = std::string;使用 TimeT = unsigned int;结构代码更新信息{IdType m_id;符号 m_symbol;TimeT m_last_update_time;朋友 std::ostream&运算符<<(std::ostream& os,tickerUpdateInfo const& tui){返回操作系统<
那里.我们可以使用它.您需要一个主要基于时间的索引,但您可以稍后针对符号/ID 进行优化:
typedef bmi::multi_index_container,bmi::composite_key<tickerUpdateInfo,BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, TimeT, m_last_update_time),BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, Symbol, m_symbol),BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, IdType, m_id)>>>>ticker_update_info_set;
对于我们的实现,我们甚至不需要使用二级关键组件,我们可以直接写
std::map活动组织(ticker_update_info_set const& tuis, TimeT 起){std::map历史;自动常量&index = tuis.get();自动磅 = index.upper_bound(since);//对于大于包含使用lower_boundfor (auto& rec : boost::make_iterator_range(lb, index.end()))histo[rec.m_symbol]++;返回历史;}
查看在 Coliru 上直播..>
现在,如果卷变大,您可能会尝试使用二级索引组件进行优化:
std::mapactivity_histo_ex(ticker_update_info_set const& tuis, TimeT 因为){std::map历史;自动常量&index = tuis.get();for (auto lb = index.upper_bound(since), end = tuis.end(); lb != end;)//大于包含使用lower_bound{自动 ub = index.upper_bound(boost::make_tuple(lb->m_last_update_time, lb->m_symbol));histo[lb->m_symbol] += std::distance(lb, ub);磅 = ub;}返回历史;}
我不确定这会成为更快的方法(您的分析员会知道).看看它也住在 Coliru.
重新考虑设计?
由于次优插入时间和迭代记录时缺乏参考位置,这整个多索引的事情可能会减慢你的速度.
我建议看
- 按更新时间排序的单个 flat_multimap
- 甚至是(固定大小的)线性环形缓冲区按时间顺序排列.这很有意义,因为无论如何您很可能以递增的时间顺序接收事件,因此您可以继续在末尾追加(并在历史窗口已满时回绕).这一下子消除了重新分配的所有需要??(假设您为环形缓冲区选择了合适的最大容量),并为您提供了遍历统计列表的最佳缓存预取性能.
一旦您使用 Boost Lockfree 的 spsc_queue
产品实现环形缓冲区,第二种方法应该确实有一些优点.为什么?因为您可以将其托管在共享内存中:
共享内存 IPC 同步(无锁)
<小时>1 复杂性是有保证的 iff 您的代码本来是独立的.可悲的是,它不是(根本).我不得不修剪它以使某些东西起作用.显然,这是在删除所有行号之后:)
Here is the situation that I am trying to simulate:
COL1 Col2 Col3
CBT.151.5.T.FEED S1 t1
CBT.151.5.T.FEED s2 t2
CBT.151.5.T.FEED s3 t3
CBT.151.5.T.FEED s4 t4
CBT.151.5.T.FEED s5 t1
CBT.151.8.T.FEED s7 t1
CBT.151.5.Q.FEED s8 t3
COL1 - is the ID, for a given ID there can be several symbols.
COL2 - symbols, they are unique
COL3 - update time of a symbol, two different symbols might update at the same time hence they are not unique.
My aim is to get the tickers which are most active, lets say symbols that have updated in the last 60 seconds. For this purpose I have used the boost multi index.
The Header file:
#ifndef __TICKER_INFO_MANAGER_IMPL__
#define __TICKER_INFO_MANAGER_IMPL__
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <TickerInfoManagerConstants.h>
#include <TickerInfo.h>
namespace bmi = boost::multi_index;
namespace bip = boost::interprocess;
struct id_index{};
struct symbol_index{};
struct last_update_time_index{};
struct Less {
template<class T, class U>
bool operator()(T const& t, U const& u) const {
return t < u;
}
};
typedef bmi::multi_index_container<
tickerUpdateInfoT,
bmi::indexed_by<
bmi::ordered_unique
<bmi::tag<id_index>, BOOST_MULTI_INDEX_MEMBER( tickerUpdateInfo, shm_string, m_id), Less>,
bmi::ordered_unique<
bmi::tag<symbol_index>,BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, shm_string, m_symbol), Less>,
bmi::ordered_non_unique
<bmi::tag<last_update_time_index>, BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, int, m_last_update_time), Less> >,
bip::managed_shared_memory::allocator<tickerUpdateInfo>::type
> ticker_update_info_set;
class tickerInfoMangerImplementation {
public:
tickerInfoMangerImplementation( const sharedMemoryNameT & name );
bool put_records( const tickerUpdateInfoT & record );
int get_active_ticker_count( const thresholdT seconds );
void print_contents();
bip::managed_shared_memory& get_managed_memory_segment() {
return m_managed_memory_segment;
}
private:
const sharedMemoryNameT m_name;
bip::managed_shared_memory m_managed_memory_segment;
ticker_update_info_set *p_ticker_info_set;
};
#endif
The cpp file
#include <TickerInfoMangerImplementation.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <iostream>
#include "basic_time.h"
using namespace boost::interprocess;
tickerInfoMangerImplementation::tickerInfoMangerImplementation( const sharedMemoryNameT & name ): m_name(name),
m_managed_memory_segment( open_or_create, "test", 65536 )
{
p_ticker_info_set = m_managed_memory_segment.find_or_construct<ticker_update_info_set>
("SetOfTickerUpdateInformation") //Container's name in shared memory
( ticker_update_info_set::ctor_args_list()
, m_managed_memory_segment.get_allocator<tickerUpdateInfoT>()); //Ctor parameters
}
bool tickerInfoMangerImplementation::put_records( const tickerUpdateInfoT & record ) {
std::pair<ticker_update_info_set::iterator, bool> result_pair = p_ticker_info_set->insert( record );
if( result_pair.second ) {
return result_pair.second;
}
typedef ticker_update_info_set::index<symbol_index>::type ticker_update_info_set_by_symbol;
ticker_update_info_set_by_symbol & sym_index = (*p_ticker_info_set).get<symbol_index>();
ticker_update_info_set_by_symbol::iterator it = sym_index.find( record.m_symbol );
tickerUpdateInfoT ticker_info = *it;
ticker_info.m_last_update_time = record.m_last_update_time;
return sym_index.replace( it, ticker_info );
}
int tickerInfoMangerImplementation::calculate_historical_time_using_threshold( const thresholdT seconds ) {
basic_time::Secs_t seconds( threshold );
basic_time tick_time;
tick_time -= seconds;
return ( tick_time.fullTime() );
}
int tickerInfoMangerImplementation::get_active_ticker_count( const thresholdT seconds, std::string key ) {
typedef ticker_update_info_set::index<id_index>::type ticker_update_info_set_by_id;
ticker_update_info_set_by_id & id_index = (*p_ticker_info_set).get<id_index>();
int tick_time = calculate_historical_time_using_threshold( seconds );
//Here I would like to find the key
//Based on that key I would like to fetch all the symbols which have updated after a certain time(using lower bound)
std::copy( it, time_index.end(), std::ostream_iterator<tickerUpdateInfoT>(std::cout) );
}
void tickerInfoMangerImplementation::print_contents() {
const ticker_update_info_set::nth_index<1>::type& name_index = (*p_ticker_info_set).get<1>();
std::copy( name_index.begin(), name_index.end(), std::ostream_iterator<tickerUpdateInfoT>(std::cout) );
}
std::ostream& operator<<(std::ostream& os, const tickerUpdateInfoT & obj) {
os << obj.m_id << " ";
os << obj.m_symbol << " ";
os << obj.m_last_update_time << " " << "
";
return os;
};
Structure of a record that I would insert into boost multi index
#ifndef __TICKER_INFO__
#define __TICKER_INFO__
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
typedef boost::interprocess::managed_shared_memory::allocator<char>::type char_allocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, char_allocator> shm_string;
//Data to insert in shared memory
typedef struct tickerUpdateInfo {
shm_string m_id;
shm_string m_symbol;
int m_last_update_time;
tickerUpdateInfo( const char * id,
const char *symbol,
int last_update_time,
const char_allocator &a)
: m_id( id, a), m_symbol( symbol, a), m_last_update_time( last_update_time) {
}
tickerUpdateInfo& operator=(const tickerUpdateInfo& other) {
if (this != &other) {
m_last_update_time = other.m_last_update_time;
}
return *this;
}
} tickerUpdateInfoT;
#endif
Now in the function get_active_ticker_count() I want to specify the key like CBT.151.5.T.FEED and it should return:
S1 t1
s2 t2
s3 t3
s4 t4
s5 t1
Lets assume t1 > t2 > t3 > t4, then I would like to find out such sets where times are greater than t3 and also want to find the count of such symbols. How do I proceed with the same, I have been able to insert but I am stuck with the retrieval part. Please help!
解决方案I've simplified your (ridiculously complicated1) model to:
enum TimePoints { // Lets assume t1 > t2 > t3 > t4
t1 = 100,
t2 = 80,
t3 = 70,
t4 = 20,
};
using IdType = std::string;
using Symbol = std::string;
using TimeT = unsigned int;
struct tickerUpdateInfo {
IdType m_id;
Symbol m_symbol;
TimeT m_last_update_time;
friend std::ostream& operator<<(std::ostream& os, tickerUpdateInfo const& tui) {
return os << "T[" << tui.m_id << ", " << tui.m_symbol << ", " << tui.m_last_update_time << "]";
}
} static const data[] = {
{ "CBT.151.5.T.FEED", "S1", t1 },
{ "CBT.151.5.T.FEED", "s2", t2 },
{ "CBT.151.5.T.FEED", "s3", t3 },
{ "CBT.151.5.T.FEED", "s4", t4 },
{ "CBT.151.5.T.FEED", "s5", t1 },
{ "CBT.151.8.T.FEED", "s7", t1 },
{ "CBT.151.5.Q.FEED", "s8", t3 },
};
There. We can work with that. You want an index that's primarily time based, yet you can refine for symbol/id later:
typedef bmi::multi_index_container<tickerUpdateInfo,
bmi::indexed_by<
bmi::ordered_non_unique<bmi::tag<struct most_active_index>,
bmi::composite_key<tickerUpdateInfo,
BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, TimeT, m_last_update_time),
BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, Symbol, m_symbol),
BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, IdType, m_id)
> > >
> ticker_update_info_set;
For our implementation, we don't even need to use the secondary key components, we can just write
std::map<Symbol, size_t> activity_histo(ticker_update_info_set const& tuis, TimeT since)
{
std::map<Symbol, size_t> histo;
auto const& index = tuis.get<most_active_index>();
auto lb = index.upper_bound(since); // for greater-than-inclusive use lower_bound
for (auto& rec : boost::make_iterator_range(lb, index.end()))
histo[rec.m_symbol]++;
return histo;
}
See it Live On Coliru.
Now if volumes get large, you could be tempted to optimize a bit using the secondary index component:
std::map<Symbol, size_t> activity_histo_ex(ticker_update_info_set const& tuis, TimeT since)
{
std::map<Symbol, size_t> histo;
auto const& index = tuis.get<most_active_index>();
for (auto lb = index.upper_bound(since), end = tuis.end(); lb != end;) // for greater-than-inclusive use lower_bound
{
auto ub = index.upper_bound(boost::make_tuple(lb->m_last_update_time, lb->m_symbol));
histo[lb->m_symbol] += std::distance(lb, ub);
lb = ub;
}
return histo;
}
I'm not sure this would become the quicker approach (your profiler would know). See it Live On Coliru too.
Rethink the design?
TBH this whole multi index thing is likely to slow you down due to suboptimal insertion times and lack of locality-of-reference when iterating records.
I'd suggest looking at
- a single flat_multimap ordered by update-time
- or even a (fixed size) linear ring-buffer order by time. This would make a lot of sense since you are most likely receiving the events in increasing time order anyways, so you can just keep appending at the end (and wrap around when the history window is full). This all at once removes all need for reallocation (given that you choose an appropriate maximum capacity for the ringbuffer) as well as give you optimal cache prefetch performance traversing the list for stats.
The second approach should really get some merit once you implement the ringbuffer using Boost Lockfree's spsc_queue
offering. Why? Because you can host it in shared memory:
Shared-memory IPC synchronization (lock-free)
1 the complexity would be warranted iff your code would have been selfcontained. Sadly, it was not (at all). I had to prune it in order to get something to work. This was, obviously, after removing all line numbers :)
相关文章