如何在Beast中保持此HTTPS连接?
我对GraphQL服务器进行了大约30,000个查询;因为我有一个高延迟连接,所以我使用线程并行地执行许多查询。目前,每个查询都会建立一个新的连接;我希望重用这些连接,这应该会减少整个下载所需的时间。以下是我的代码:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <chrono>
#include <vector>
#include <array>
#include <iostream>
#include "http.h"
namespace beast=boost::beast;
namespace http=beast::http;
namespace net=boost::asio;
namespace ssl=net::ssl;
using tcp=net::ip::tcp;
using namespace std;
namespace cr=chrono;
struct TimeBytes
/* Used to compute the latency and data rate, which will be used
* to compute the number of I/O threads for the next run.
*/
{
float ms;
int bytes;
};
cr::steady_clock clk;
vector<TimeBytes> timeBytes;
mutex timeBytesMutex;
thread_local string lastProto,lastHost,lastPort;
array<string,4> parseUrl(string url)
// protocol, hostname, port, path. All are strings, including the port.
{
size_t pos0=url.find("://");
size_t pos1;
array<string,4> ret;
ret[0]=url.substr(0,pos0);
if (pos0<url.length())
pos0+=3;
pos1=url.find("/",pos0);
ret[1]=url.substr(pos0,pos1-pos0);
ret[3]=url.substr(pos1);
pos0=ret[1].find(":");
if (pos0<ret[1].length())
{
ret[2]=ret[1].substr(pos0+1);
ret[1]=ret[1].substr(0,pos0);
}
else
if (ret[0]=="https")
ret[2]="443";
else if (ret[0]=="https")
ret[2]="80";
else
ret[2]="0";
return ret;
}
string httpPost(string url,string data)
{
net::io_context context;
ssl::context ctx(ssl::context::tlsv12_client);
tcp::resolver res(context);
tcp::resolver::results_type endpoints;
beast::ssl_stream<beast::tcp_stream> stream(context,ctx);
array<string,4> parsed=parseUrl(url);
http::request<http::string_body> req;
http::response<http::string_body> resp;
beast::flat_buffer buffer;
TimeBytes tb;
cr::nanoseconds elapsed;
cr::time_point<cr::steady_clock> timeStart=clk.now();
//if (parsed[0]==lastProto && parsed[1]==lastHost && parsed[2]==lastPort)
//cout<<"same host
";
//load_root_certificates(ctx);
try
{
ctx.set_verify_mode(ssl::verify_peer);
endpoints=res.resolve(parsed[1],parsed[2]);
beast::get_lowest_layer(stream).connect(endpoints);
SSL_set_tlsext_host_name(stream.native_handle(),parsed[1].c_str());
if (parsed[0]=="https")
stream.handshake(net::ssl::stream_base::client);
req.method(http::verb::post);
req.target(parsed[3]);
req.set(http::field::host,parsed[1]);
req.set(http::field::connection,"keep-alive");
req.set(http::field::user_agent,BOOST_BEAST_VERSION_STRING);
req.set(http::field::content_type,"application/json");
req.set(http::field::accept,"application/json");
req.body()=data;
req.prepare_payload();
http::write(stream,req);
http::read(stream,buffer,resp);
elapsed=clk.now()-timeStart;
tb.ms=elapsed.count()/1e6;
tb.bytes=req.body().size()+resp.body().size()+7626;
// 7626 accounts for HTTP, TCP, IP, and Ethernet headers.
timeBytesMutex.lock();
timeBytes.push_back(tb);
timeBytesMutex.unlock();
beast::close_socket(beast::get_lowest_layer(stream));
if (DEBUG_QUERY)
{
cout<<parsed[0]<<"|
"<<parsed[1]<<"|
"<<parsed[2]<<"|
"<<parsed[3]<<"|
";
cout<<data<<"|
";
}
}
catch (...)
{
}
lastProto=parsed[0];
lastHost=parsed[1];
lastPort=parsed[2];
return resp.body();
}
大多数请求都发送到一台服务器。向另一台服务器发出一些GET请求(使用与httpPost
非常相似的httpGet
函数)。下载数据后,我会对其进行处理,因此我希望在开始处理之前先关闭连接。
close_socket()
之前尝试将context
、ctx
和stream
设为本地线程,但程序在主线程第二次调用httpPost
时崩溃,从http::read
引发错误。(工作线程在主线程的两个查询之间进行一个查询。)在这一点上,我并没有试图保持连接打开,而是试图使线程本地工作,以便我可以保持连接打开。
解决方案
我强烈建议使用异步接口。由于大部分时间显然都花在等待IO上,因此您很可能只从单个线程获得所有吞吐量。
下面的示例回答了您的问题(如何使客户端对多个请求保持打开状态),同时使处理异步化。现在,缺点是需要对单个客户端上的所有请求进行排序(这就是我使用_tasks
队列的目的)。然而,这可能会起到启发作用。
请注意,初始化函数适用于所有完成处理程序结果类型:net::use_future
、net::spawn
(协程)等。
Live On Coliru
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <chrono>
#include <deque>
#include <iomanip>
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
using clk = std::chrono::steady_clock;
using net::ip::tcp;
using beast::error_code;
using namespace std::chrono_literals;
/* Used to compute the latency and data rate, which will be used to compute the
* number of I/O threads for the next run. */
struct TimeBytes {
long double ms;
size_t bytes;
};
static std::vector<TimeBytes> timeBytes;
static std::mutex timeBytesMutex;
struct Url {
struct Spec {
std::string hostname, port;
bool operator<(Spec const& rhs) const {
return std::tie(hostname, port) < std::tie(rhs.hostname, rhs.port);
}
};
std::string protocol, hostname, port, path;
Spec specification() const { return {hostname, port}; }
};
#include <boost/spirit/home/x3.hpp>
#include <boost/fusion/adapted/std_tuple.hpp>
namespace x3 = boost::spirit::x3;
Url parseUrl(std::string const& url)
{
Url ret;
std::string hostport;
{
static const auto url_ = *(x3::char_ - "://") >> "://" // protocol
>> +~x3::char_('/') // hostname
>> *x3::char_; // path
auto into = std::tie(ret.protocol, hostport, ret.path);
parse(begin(url), end(url), x3::expect[url_], into);
}
{
static const auto portspec_ = -(':' >> x3::uint_) >> x3::eoi;
static const auto hostport_ =
x3::raw[+(+~x3::char_(':') | !portspec_ >> x3::char_)] //
>> -portspec_;
boost::optional<uint16_t> port;
auto into = std::tie(ret.hostname, port);
parse(begin(hostport), end(hostport), x3::expect[hostport_], into);
if (port.has_value()) { ret.port = std::to_string(*port); }
else if (ret.protocol == "https") { ret.port = "443"; }
else if (ret.protocol == "http") { ret.port = "80"; }
else { ret.port = "0"; }
}
return ret;
}
struct Client : std::enable_shared_from_this<Client> {
public:
Client(net::any_io_executor ex, Url::Spec spec, ssl::context& ctx)
: _executor(ex)
, _spec(spec)
, _sslcontext(ctx)
{
}
template <typename Token>
auto async_request(http::verb verb, std::string const& path,
std::string const& data, Token&& token)
{
using R = typename net::async_result<std::decay_t<Token>,
void(error_code, std::string)>;
using H = typename R::completion_handler_type;
H handler(std::forward<Token>(token));
R result(handler);
auto chain_tasks = [this, h = std::move(handler),
self = shared_from_this()](auto&&... args) mutable {
if (!self->_tasks.empty()) {
dispatch(self->_executor, [this, self] {
if (not _tasks.empty()) _tasks.pop_front();
if (not _tasks.empty()) _tasks.front()->initiate();
});
}
std::move(h)(std::forward<decltype(args)>(args)...);
};
auto task = std::make_shared<RequestOp<decltype(chain_tasks)>>(
this, verb, path, data, chain_tasks);
enqueue(std::move(task));
return result.get();
}
template <typename Token>
auto async_post(std::string const& path, std::string const& data,
Token&& token)
{
return async_request(http::verb::post,path, data, std::forward<Token>(token));
}
template <typename Token>
auto async_get(std::string const& path, Token&& token)
{
return async_request(http::verb::get,path, "", std::forward<Token>(token));
}
private:
template <typename Token> auto async_reconnect(Token&& token)
{
using R = typename net::async_result<std::decay_t<Token>, void(error_code)>;
using H = typename R::completion_handler_type;
H handler(std::forward<Token>(token));
R result(handler);
assert(!_stream.has_value()); // probably a program flow bu
_stream.emplace(_executor, _sslcontext);
std::make_shared<ReconnectOp<H>>(this, std::move(handler))->start();
return result.get();
}
template <typename Handler>
struct ReconnectOp : std::enable_shared_from_this<ReconnectOp<Handler>> {
ReconnectOp(Client* client, Handler h)
: _client{client}
, _handler(std::move(h))
, _resolver(client->_stream->get_executor())
{
}
Client* _client;
Handler _handler;
tcp::resolver _resolver;
bool checked(error_code ec, bool complete = false) {
if (complete || ec)
std::move(_handler)(ec);
if (ec && _client->_stream.has_value())
{
std::cerr << "Socket " << _client->_stream->native_handle()
<< " closed due to " << ec.message() << std::endl;
_client->_stream.reset();
}
return !ec.failed();
}
void start()
{
_resolver.async_resolve(
_client->_spec.hostname, _client->_spec.port,
beast::bind_front_handler(&ReconnectOp::on_resolved,
this->shared_from_this()));
}
void on_resolved(error_code ec, tcp::resolver::results_type ep)
{
if (checked(ec)) {
beast::get_lowest_layer(*_client->_stream)
.async_connect(
ep,
beast::bind_front_handler(&ReconnectOp::on_connected,
this->shared_from_this()));
}
}
void on_connected(error_code ec, tcp::endpoint ep) {
if (checked(ec)) {
std::cerr << "Socket " << _client->_stream->native_handle()
<< " (re)connected to " << ep << std::endl;
auto& hostname = _client->_spec.hostname;
SSL_set_tlsext_host_name(_client->_stream->native_handle(),
hostname.c_str());
_client->_stream->async_handshake(
Stream::client,
beast::bind_front_handler(&ReconnectOp::on_ready,
this->shared_from_this()));
}
}
void on_ready(error_code ec) {
checked(ec, true);
}
};
struct IAsyncTask {
virtual void initiate() = 0;
};
template <typename Handler>
struct RequestOp : IAsyncTask, std::enable_shared_from_this<RequestOp<Handler>> {
RequestOp(Client* client, http::verb verb, std::string const& path,
std::string data, Handler h)
: _client(client)
, _handler(std::move(h))
, _request(verb, path, 11, std::move(data))
{
_request.set(http::field::host, _client->_spec.hostname);
_request.set(http::field::connection, "keep-alive");
_request.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
_request.set(http::field::content_type, "application/json");
_request.set(http::field::accept, "application/json");
_request.prepare_payload();
}
Client* _client;
Handler _handler;
http::request<http::string_body> _request;
http::response<http::string_body> _response;
beast::flat_buffer _buffer;
size_t _bandwidth = 0;
clk::time_point _start = clk::now();
bool checked(error_code ec, bool complete = false) {
if (complete || ec)
std::move(_handler)(ec, std::move(_response.body()));
if (ec)
_client->_stream.reset();
return !ec.failed();
}
void initiate() override
{
if (!_client->_stream.has_value()) {
_client->async_reconnect(beast::bind_front_handler(
&RequestOp::on_connected, this->shared_from_this()));
} else {
on_connected(error_code{});
}
}
void on_connected(error_code ec) {
_start = clk::now(); // This matches the start of measurements in
// the original, synchronous code
http::async_write(*_client->_stream, _request,
beast::bind_front_handler(
&RequestOp::on_sent, this->shared_from_this()));
}
void on_sent(error_code ec, size_t transferred) {
_bandwidth += transferred; // measuring actual bytes including HTTP headers
if (checked(ec)) {
http::async_read(
*_client->_stream, _buffer, _response,
beast::bind_front_handler(&RequestOp::on_response,
this->shared_from_this()));
}
}
void on_response(error_code ec, size_t transferred) {
_bandwidth += transferred; // measuring actual bytes including HTTP headers
std::lock_guard lk(timeBytesMutex);
timeBytes.push_back({(clk::now() - _start) / 1.0ms, _bandwidth});
checked(ec, true);
}
};
private:
net::any_io_executor _executor;
Url::Spec _spec;
ssl::context& _sslcontext;
using Stream = beast::ssl_stream<beast::tcp_stream>;
std::optional<Stream> _stream; // nullopt when disconnected
// task queueing
using AsyncTask = std::shared_ptr<IAsyncTask>;
std::deque<AsyncTask> _tasks;
void enqueue(AsyncTask task) {
post(_executor,
[=, t = std::move(task), this, self = shared_from_this()] {
_tasks.push_back(std::move(t));
if (_tasks.size() == 1) {
_tasks.front()->initiate();
}
});
}
};
int main()
{
ssl::context ctx(ssl::context::tlsv12_client);
ctx.set_verify_mode(ssl::verify_peer);
ctx.set_default_verify_paths();
// load_root_certificates(ctx);
net::thread_pool io(1);
std::map<Url::Spec, std::shared_ptr<Client> > pool;
using V = http::verb;
for (auto [url, verb, data] : {
std::tuple //
{"https://httpbin.org/post", V::post, "post data"},
{"https://httpbin.org/delay/5", V::delete_, ""},
{"https://httpbin.org/base64/ZGVjb2RlZCBiYXM2NA==", V::get, ""},
{"https://httpbin.org/delay/7", V::patch, ""},
{"https://httpbin.org/stream/3", V::get, ""},
{"https://httpbin.org/uuid", V::get, ""},
}) //
{
auto parsed = parseUrl(url);
std::cout << std::quoted(parsed.protocol) << " "
<< std::quoted(parsed.hostname) << " "
<< std::quoted(parsed.port) << " "
<< std::quoted(parsed.path) << "
";
auto spec = parsed.specification();
if (!pool.contains(spec)) {
pool.emplace(spec,
std::make_shared<Client>(
make_strand(io.get_executor()), spec, ctx));
}
pool.at(spec)->async_request(
verb, parsed.path, data,
[=, v = verb, u = url](error_code ec, std::string const& body) {
std::cout << v << " to " << u << ": " << std::quoted(body)
<< std::endl;
});
}
io.join();
for (auto& [time, bytes] : timeBytes) {
std::cout << bytes << " bytes in " << time << "ms
";
}
}
在我的系统上打印
"https" "httpbin.org" "443" "/post"
"https" "httpbin.org" "443" "/delay/5"
"https" "httpbin.org" "443" "/base64/ZGVjb2RlZCBiYXM2NA=="
"https" "httpbin.org" "443" "/delay/7"
"https" "httpbin.org" "443" "/stream/3"
"https" "httpbin.org" "443" "/uuid"
Socket 0x7f4ad4001060 (re)connected to 18.232.227.86:443
POST to https://httpbin.org/post: "{
"args": {},
"data": "post data",
"files": {},
"form": {},
"headers": {
"Accept": "application/json",
"Content-Length": "9",
"Content-Type": "application/json",
"Host": "httpbin.org",
"User-Agent": "Boost.Beast/318",
"X-Amzn-Trace-Id": "Root=1-618b513c-2c51c112061b10456a5e3d4e"
},
"json": null,
"origin": "163.158.244.77",
"url": "https://httpbin.org/post"
}
"
DELETE to https://httpbin.org/delay/5: "{
"args": {},
"data": "",
"files": {},
"form": {},
"headers": {
"Accept": "application/json",
"Content-Type": "application/json",
"Host": "httpbin.org",
"User-Agent": "Boost.Beast/318",
"X-Amzn-Trace-Id": "Root=1-618b513c-324c97504eb79d8b743c6c5d"
},
"origin": "163.158.244.77",
"url": "https://httpbin.org/delay/5"
}
"
GET to https://httpbin.org/base64/ZGVjb2RlZCBiYXM2NA==: "decoded bas64"
PATCH to https://httpbin.org/delay/7: "{
"args": {},
"data": "",
"files": {},
"form": {},
"headers": {
"Accept": "application/json",
"Content-Type": "application/json",
"Host": "httpbin.org",
"User-Agent": "Boost.Beast/318",
"X-Amzn-Trace-Id": "Root=1-618b5141-3a8c30e60562df583061fc5a"
},
"origin": "163.158.244.77",
"url": "https://httpbin.org/delay/7"
}
"
GET to https://httpbin.org/stream/3: "{"url": "https://httpbin.org/stream/3", "args": {}, "headers": {"Host": "httpbin.org", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 0}
{"url": "https://httpbin.org/stream/3", "args": {}, "headers": {"Host": "httpbin.org", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 1}
{"url": "https://httpbin.org/stream/3", "args": {}, "headers": {"Host": "httpbin.org", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 2}
"
GET to https://httpbin.org/uuid: "{
"uuid": "4557c909-880e-456c-8ef9-049a72f5fda1"
}
"
826 bytes in 84.9807ms
752 bytes in 5267.26ms
425 bytes in 84.6031ms
751 bytes in 7085.28ms
1280 bytes in 86.6554ms
434 bytes in 85.0086ms
注意:
Httpbin.org有各种测试URL-其中一些会产生长时间延迟,因此会出现计时
只有一个连接。如果出现IO错误,我们将断开连接(下一次请求时应重新连接)
HTTP错误不是错误,因为连接保持有效
DNS解析、连接和握手也是异步
相关文章