boost::asio 和活动对象
我已经实现了一些基于模块的 Active Object 设计模式.这是非常简单的实现.我有调度程序、激活列表、请求和期货来获得响应.我的要求是这样的:
I have implemented some module based Active Object design pattern. It is very simple implementation. I have Scheduler, ActivationList, Requests and Futures to get response. My requirements were like that:
- 对活动对象的访问应通过执行其方法进行序列化在它自己的线程中(主要请求和活动对象的假设设计模式)
- 调用者应能够指定请求执行的优先级.这意味着如果等待执行的请求超过零个,它们将按照分配给每个请求的优先级排序.优先级较高的请求应首先执行,因此如果 ActivationList 上总是有一些请求未决,并且它们的优先级高于给定请求,则永远不会执行此请求 - 对我来说没问题
- 应该可以指定列表中待处理的最大请求数(限制内存使用)
- 可以使所有未决请求无效
- 请求应该能够返回值(阻塞调用者)或者只执行没有返回值但调用者应该被阻塞直到请求被处理或者调用者不应被阻塞并且如果给定的请求已经被处理与否g
- 就在请求执行之前,将执行一些保护方法以检查是否应执行给定的请求.如果不是 - 它应该向调用者返回一些未定义的值(在我当前的实现中它是 boost::none,因为每个请求返回类型都是 boost::optional)
好的,现在问题:是否可以使用 boost::asio 并满足我的所有要求?我的实现正在运行,但我想使用一些可能比我这样做更好的方式实现的东西.我也想知道它的未来,不要再次重新发明轮子".
OK now question: Is it possible to use boost::asio and fulfill all my requirements? My implementation is working but I would like to use something what is probably implemented in much better way than I have done this. Also I would like to know it for the future and do not "reinvent the wheel" once again.
推荐答案
Boost.Asio 可用于包含 Active Object:将方法执行与方法调用分离.需要在更高级别处理其他要求,但将 Boost.Asio 与其他 Boost 库结合使用时不会过于复杂.
Boost.Asio can be used to encompass the intention of Active Object: decouple method execution from method invocation. Additional requirements will need to be handled at a higher-level, but it is not overly complex when using Boost.Asio in conjunction with other Boost libraries.
Scheduler
可以使用:
boost::thread代码>用于线程抽象.
boost::thread_group代码>来管理线程的生命周期.
boost::asio::io_service
提供线程池.可能想要使用boost::asio::io_service::work
在没有待处理的工作时保持线程处于活动状态.
ActivationList
可以实现为:
- 用于获取最高优先级方法请求的 Boost.MultiIndex.使用提示位置
insert()
,为具有相同优先级的请求保留插入顺序. 可以使用 std::multiset
或std::multimap
.但是,在 C++03 中,对于具有相同键(优先级)的请求的顺序没有指定.- 如果
Request
不需要保护方法,则可以使用std::priority_queue
.
- A Boost.MultiIndex for obtaining highest priority method request. With a hinted-position
insert()
, the insertion order is preserved for request with the same priority. std::multiset
orstd::multimap
can be used. However, it is unspecified in C++03 as to the order of request with the same key (priority).- If
Request
do not need an guard method, thenstd::priority_queue
could be used.
Request
可以是未指定的类型:
Request
could be an unspecified type:
boost::function
和boost::bind
可用于提供类型擦除,同时绑定到可调用类型而不引入Request
层次结构.
boost::function
andboost::bind
could be used to provide a type-erasure, while binding to callable types without introducing aRequest
hierarchy.
Futures
可以使用 Boost.Thread 的 期货 支持.
Futures
could use Boost.Thread's Futures support.
- 如果
future.valid()
将返回 true.future.wait()
将阻止等待结果可用.future.get()
会阻塞等待结果.- 如果调用者没有对
future
做任何事情,那么调用者将不会被阻止. - 使用 Boost.Thread 的 Futures 的另一个好处是源自
Request
的异常将被传递到Future
.
Request
已添加到 ActivationList
,future.valid()
will return true ifRequest
has been added toActivationList
.future.wait()
will block waiting for a result to become available.future.get()
will block waiting for the result.- If caller does nothing with the
future
, then caller will not be blocked. - Another benefit to using Boost.Thread's Futures is that exceptions originating from within a
Request
will be passed to theFuture
.
这是一个利用各种 Boost 库的完整示例,应满足要求:
Here is a complete example leveraging various Boost libraries and should meet the requirements:
// Standard includes
#include <algorithm> // std::find_if
#include <iostream>
#include <string>
// 3rd party includes
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/utility/result_of.hpp>
/// @brief scheduler that provides limits with prioritized jobs.
template <typename Priority,
typename Compare = std::less<Priority> >
class scheduler
{
public:
typedef Priority priority_type;
private:
/// @brief method_request is used to couple the guard and call
/// functions for a given method.
struct method_request
{
typedef boost::function<bool()> ready_func_type;
typedef boost::function<void()> run_func_type;
template <typename ReadyFunctor,
typename RunFunctor>
method_request(ReadyFunctor ready,
RunFunctor run)
: ready(ready),
run(run)
{}
ready_func_type ready;
run_func_type run;
};
/// @brief Pair type used to associate a request with its priority.
typedef std::pair<priority_type,
boost::shared_ptr<method_request> > pair_type;
static bool is_method_ready(const pair_type& pair)
{
return pair.second->ready();
}
public:
/// @brief Construct scheduler.
///
/// @param max_threads Maximum amount of concurrent task.
/// @param max_request Maximum amount of request.
scheduler(std::size_t max_threads,
std::size_t max_request)
: work_(io_service_),
max_request_(max_request),
request_count_(0)
{
// Spawn threads, dedicating them to the io_service.
for (std::size_t i = 0; i < max_threads; ++i)
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_));
}
/// @brief Destructor.
~scheduler()
{
// Release threads from the io_service.
io_service_.stop();
// Cleanup.
threads_.join_all();
}
/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
typedef typename boost::result_of<RunFunctor()>::type result_type;
typedef boost::unique_future<result_type> future_type;
boost::unique_lock<mutex_type> lock(mutex_);
// If max request has been reached, then return an invalid future.
if (max_request_ &&
(request_count_ == max_request_))
return future_type();
++request_count_;
// Use a packaged task to handle populating promise and future.
typedef boost::packaged_task<result_type> task_type;
// Bind does not work with rvalue, and packaged_task is only moveable,
// so allocate a shared pointer.
boost::shared_ptr<task_type> task =
boost::make_shared<task_type>(run_func);
// Create method request.
boost::shared_ptr<method_request> request =
boost::make_shared<method_request>(
ready_func,
boost::bind(&task_type::operator(), task));
// Insert into priority. Hint to inserting as close to the end as
// possible to preserve insertion order for request with same priority.
activation_list_.insert(activation_list_.end(),
pair_type(priority, request));
// There is now an outstanding request, so post to dispatch.
io_service_.post(boost::bind(&scheduler::dispatch, this));
return task->get_future();
}
/// @brief Insert a method request into the scheduler.
///
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
return insert(priority_type(), ready_func, run_func);
}
/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const RunFunctor& run_func)
{
return insert(priority, &always_ready, run_func);
}
/// @brief Insert a method request with default priority into the
/// scheduler.
///
/// @param run_func Invoked when ready to run.
///
/// @param functor Job to run.
///
/// @return future associated with the job.
template <typename RunFunc>
boost::unique_future<typename boost::result_of<RunFunc()>::type>
insert(const RunFunc& run_func)
{
return insert(&always_ready, run_func);
}
/// @brief Cancel all outstanding request.
void cancel()
{
boost::unique_lock<mutex_type> lock(mutex_);
activation_list_.clear();
request_count_ = 0;
}
private:
/// @brief Dispatch a request.
void dispatch()
{
// Get the current highest priority request ready to run from the queue.
boost::unique_lock<mutex_type> lock(mutex_);
if (activation_list_.empty()) return;
// Find the highest priority method ready to run.
typedef typename activation_list_type::iterator iterator;
iterator end = activation_list_.end();
iterator result = std::find_if(
activation_list_.begin(), end, &is_method_ready);
// If no methods are ready, then post into dispatch, as the
// method may have become ready.
if (end == result)
{
io_service_.post(boost::bind(&scheduler::dispatch, this));
return;
}
// Take ownership of request.
boost::shared_ptr<method_request> method = result->second;
activation_list_.erase(result);
// Run method without mutex.
lock.unlock();
method->run();
lock.lock();
// Perform bookkeeping.
--request_count_;
}
static bool always_ready() { return true; }
private:
/// @brief List of outstanding request.
typedef boost::multi_index_container<
pair_type,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<
boost::multi_index::member<pair_type,
typename pair_type::first_type,
&pair_type::first>,
Compare
>
>
> activation_list_type;
activation_list_type activation_list_;
/// @brief Thread group managing threads servicing pool.
boost::thread_group threads_;
/// @brief io_service used to function as a thread pool.
boost::asio::io_service io_service_;
/// @brief Work is used to keep threads servicing io_service.
boost::asio::io_service::work work_;
/// @brief Maximum amount of request.
const std::size_t max_request_;
/// @brief Count of outstanding request.
std::size_t request_count_;
/// @brief Synchronize access to the activation list.
typedef boost::mutex mutex_type;
mutex_type mutex_;
};
typedef scheduler<unsigned int,
std::greater<unsigned int> > high_priority_scheduler;
/// @brief adder is a simple proxy that will delegate work to
/// the scheduler.
class adder
{
public:
adder(high_priority_scheduler& scheduler)
: scheduler_(scheduler)
{}
/// @brief Add a and b with a priority.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(
high_priority_scheduler::priority_type priority,
const T& a, const T& b)
{
// Insert method request
return scheduler_.insert(
priority,
boost::bind(&adder::do_add<T>, a, b));
}
/// @brief Add a and b.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(const T& a, const T& b)
{
return add(high_priority_scheduler::priority_type(), a, b);
}
private:
/// @brief Actual add a and b.
template <typename T>
static T do_add(const T& a, const T& b)
{
std::cout << "Starting addition of '" << a
<< "' and '" << b << "'" << std::endl;
// Mimic busy work.
boost::this_thread::sleep_for(boost::chrono::seconds(2));
std::cout << "Finished addition" << std::endl;
return a + b;
}
private:
high_priority_scheduler& scheduler_;
};
bool get(bool& value) { return value; }
void guarded_call()
{
std::cout << "guarded_call" << std::endl;
}
int main()
{
const unsigned int max_threads = 1;
const unsigned int max_request = 4;
// Sscheduler
high_priority_scheduler scheduler(max_threads, max_request);
// Proxy
adder adder(scheduler);
// Client
// Add guarded method to scheduler.
bool ready = false;
std::cout << "Add guarded method." << std::endl;
boost::unique_future<void> future1 = scheduler.insert(
boost::bind(&get, boost::ref(ready)),
&guarded_call);
// Add 1 + 100 with default priority.
boost::unique_future<int> future2 = adder.add(1, 100);
// Force sleep to try to get scheduler to run request 2 first.
boost::this_thread::sleep_for(boost::chrono::seconds(1));
// Add:
// 2 + 200 with low priority (5)
// "test" + "this" with high priority (99)
boost::unique_future<int> future3 = adder.add(5, 2, 200);
boost::unique_future<std::string> future4 = adder.add(99,
std::string("test"), std::string("this"));
// Max request should have been reached, so add another.
boost::unique_future<int> future5 = adder.add(3, 300);
// Check if request was added.
std::cout << "future1 is valid: " << future1.valid()
<< "
future2 is valid: " << future2.valid()
<< "
future3 is valid: " << future3.valid()
<< "
future4 is valid: " << future4.valid()
<< "
future5 is valid: " << future5.valid()
<< std::endl;
// Get results for future2 and future3. Do nothing with future4's results.
std::cout << "future2 result: " << future2.get()
<< "
future3 result: " << future3.get()
<< std::endl;
std::cout << "Unguarding method." << std::endl;
ready = true;
future1.wait();
}
执行使用线程池为 1,最多 4 个请求.
The execution uses thread pool of 1 with a max of 4 request.
- request1 被保护到程序结束,并且应该最后运行.
- request2 (1 + 100) 以默认优先级插入,应该首先运行.
- request3 (2 + 200) 被插入低优先级,应该在 request4 之后运行.
- request4 ('test' + 'this') 以高优先级插入,应该在 request3 之前运行.
- request5 应该由于最大请求而无法插入,并且应该是无效的.
输出如下:
Add guarded method.
Starting addition of '1' and '100'
future1 is valid: 1
future2 is valid: 1
future3 is valid: 1
future4 is valid: 1
future5 is valid: 0
Finished addition
Starting addition of 'test' and 'this'
Finished addition
Starting addition of '2' and '200'
Finished addition
future2 result: 101
future3 result: 202
Unguarding method.
guarded_call
相关文章