如何在Beast中保持此HTTPS连接?

我对GraphQL服务器进行了大约30,000个查询;因为我有一个高延迟连接,所以我使用线程并行地执行许多查询。目前,每个查询都会建立一个新的连接;我希望重用这些连接,这应该会减少整个下载所需的时间。以下是我的代码:

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <chrono>
#include <vector>
#include <array>
#include <iostream>
#include "http.h"

namespace beast=boost::beast;
namespace http=beast::http;
namespace net=boost::asio;
namespace ssl=net::ssl;
using tcp=net::ip::tcp;
using namespace std;
namespace cr=chrono;

struct TimeBytes
/* Used to compute the latency and data rate, which will be used
 * to compute the number of I/O threads for the next run.
 */
{
  float ms;
  int bytes;
};

cr::steady_clock clk;
vector<TimeBytes> timeBytes;
mutex timeBytesMutex;
thread_local string lastProto,lastHost,lastPort;

array<string,4> parseUrl(string url)
// protocol, hostname, port, path. All are strings, including the port.
{
  size_t pos0=url.find("://");
  size_t pos1;
  array<string,4> ret;
  ret[0]=url.substr(0,pos0);
  if (pos0<url.length())
    pos0+=3;
  pos1=url.find("/",pos0);
  ret[1]=url.substr(pos0,pos1-pos0);
  ret[3]=url.substr(pos1);
  pos0=ret[1].find(":");
  if (pos0<ret[1].length())
  {
    ret[2]=ret[1].substr(pos0+1);
    ret[1]=ret[1].substr(0,pos0);
  }
  else
    if (ret[0]=="https")
      ret[2]="443";
    else if (ret[0]=="https")
      ret[2]="80";
    else
      ret[2]="0";
  return ret;
}

string httpPost(string url,string data)
{
  net::io_context context;
  ssl::context ctx(ssl::context::tlsv12_client);
  tcp::resolver res(context);
  tcp::resolver::results_type endpoints;
  beast::ssl_stream<beast::tcp_stream> stream(context,ctx);
  array<string,4> parsed=parseUrl(url);
  http::request<http::string_body> req;
  http::response<http::string_body> resp;
  beast::flat_buffer buffer;
  TimeBytes tb;
  cr::nanoseconds elapsed;
  cr::time_point<cr::steady_clock> timeStart=clk.now();
  //if (parsed[0]==lastProto && parsed[1]==lastHost && parsed[2]==lastPort)
    //cout<<"same host
";
  //load_root_certificates(ctx);
  try
  {
    ctx.set_verify_mode(ssl::verify_peer);
    endpoints=res.resolve(parsed[1],parsed[2]);
    beast::get_lowest_layer(stream).connect(endpoints);
    SSL_set_tlsext_host_name(stream.native_handle(),parsed[1].c_str());
    if (parsed[0]=="https")
      stream.handshake(net::ssl::stream_base::client);
    req.method(http::verb::post);
    req.target(parsed[3]);
    req.set(http::field::host,parsed[1]);
    req.set(http::field::connection,"keep-alive");
    req.set(http::field::user_agent,BOOST_BEAST_VERSION_STRING);
    req.set(http::field::content_type,"application/json");
    req.set(http::field::accept,"application/json");
    req.body()=data;
    req.prepare_payload();
    http::write(stream,req);
    http::read(stream,buffer,resp);
    elapsed=clk.now()-timeStart;
    tb.ms=elapsed.count()/1e6;
    tb.bytes=req.body().size()+resp.body().size()+7626;
    // 7626 accounts for HTTP, TCP, IP, and Ethernet headers.
    timeBytesMutex.lock();
    timeBytes.push_back(tb);
    timeBytesMutex.unlock();
    beast::close_socket(beast::get_lowest_layer(stream));
    if (DEBUG_QUERY)
    {
      cout<<parsed[0]<<"|
"<<parsed[1]<<"|
"<<parsed[2]<<"|
"<<parsed[3]<<"|
";
      cout<<data<<"|
";
    }
  }
  catch (...)
  {
  }
  lastProto=parsed[0];
  lastHost=parsed[1];
  lastPort=parsed[2];
  return resp.body();
}

大多数请求都发送到一台服务器。向另一台服务器发出一些GET请求(使用与httpPost非常相似的httpGet函数)。下载数据后,我会对其进行处理,因此我希望在开始处理之前先关闭连接。

我在close_socket()之前尝试将contextctxstream设为本地线程,但程序在主线程第二次调用httpPost时崩溃,从http::read引发错误。(工作线程在主线程的两个查询之间进行一个查询。)在这一点上,我并没有试图保持连接打开,而是试图使线程本地工作,以便我可以保持连接打开。


解决方案

我强烈建议使用异步接口。由于大部分时间显然都花在等待IO上,因此您很可能只从单个线程获得所有吞吐量。

下面的示例回答了您的问题(如何使客户端对多个请求保持打开状态),同时使处理异步化。现在,缺点是需要对单个客户端上的所有请求进行排序(这就是我使用_tasks队列的目的)。然而,这可能会起到启发作用。

请注意,初始化函数适用于所有完成处理程序结果类型:net::use_futurenet::spawn(协程)等。

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <chrono>
#include <deque>
#include <iomanip>
#include <iostream>

namespace net   = boost::asio;
namespace ssl   = net::ssl;
namespace beast = boost::beast;
namespace http  = beast::http;
using clk       = std::chrono::steady_clock;
using net::ip::tcp;
using beast::error_code;

using namespace std::chrono_literals;

/* Used to compute the latency and data rate, which will be used to compute the
 * number of I/O threads for the next run.  */
struct TimeBytes {
    long double ms;
    size_t      bytes;
};

static std::vector<TimeBytes> timeBytes;
static std::mutex             timeBytesMutex;

struct Url {
    struct Spec {
        std::string hostname, port;

        bool operator<(Spec const& rhs) const {
            return std::tie(hostname, port) < std::tie(rhs.hostname, rhs.port);
        }
    };
    std::string protocol, hostname, port, path;

    Spec specification() const { return {hostname, port}; }
};

#include <boost/spirit/home/x3.hpp>
#include <boost/fusion/adapted/std_tuple.hpp>
namespace x3 = boost::spirit::x3;

Url parseUrl(std::string const& url)
{
    Url ret;
    std::string hostport;
    {
        static const auto url_ = *(x3::char_ - "://") >> "://" // protocol
            >> +~x3::char_('/')                                // hostname
            >> *x3::char_;                                     // path
        auto into = std::tie(ret.protocol, hostport, ret.path);
        parse(begin(url), end(url), x3::expect[url_], into);
    }

    {
        static const auto portspec_ = -(':' >> x3::uint_) >> x3::eoi;
        static const auto hostport_ =
            x3::raw[+(+~x3::char_(':') | !portspec_ >> x3::char_)] //
            >> -portspec_;

        boost::optional<uint16_t> port;
        auto into = std::tie(ret.hostname, port);
        parse(begin(hostport), end(hostport), x3::expect[hostport_], into);

        if (port.has_value())             { ret.port = std::to_string(*port); } 
        else if (ret.protocol == "https") { ret.port = "443";                 } 
        else if (ret.protocol == "http")  { ret.port = "80";                  } 
        else                              { ret.port = "0";                   } 
    }

    return ret;
}

struct Client : std::enable_shared_from_this<Client> {
  public:
    Client(net::any_io_executor ex, Url::Spec spec, ssl::context& ctx)
        : _executor(ex)
        , _spec(spec)
        , _sslcontext(ctx)
    {
    }

    template <typename Token>
    auto async_request(http::verb verb, std::string const& path,
                       std::string const& data, Token&& token)
    {
        using R = typename net::async_result<std::decay_t<Token>,
                                             void(error_code, std::string)>;
        using H = typename R::completion_handler_type;
        H handler(std::forward<Token>(token));
        R result(handler);

        auto chain_tasks = [this, h = std::move(handler),
                            self = shared_from_this()](auto&&... args) mutable {
            if (!self->_tasks.empty()) {
                dispatch(self->_executor, [this, self] {
                    if (not _tasks.empty()) _tasks.pop_front();
                    if (not _tasks.empty()) _tasks.front()->initiate();
                });
            }

            std::move(h)(std::forward<decltype(args)>(args)...);
        };

        auto task = std::make_shared<RequestOp<decltype(chain_tasks)>>(
            this, verb, path, data, chain_tasks);

        enqueue(std::move(task));

        return result.get();
    }

    template <typename Token>
    auto async_post(std::string const& path, std::string const& data,
                    Token&& token)
    {
        return async_request(http::verb::post,path, data, std::forward<Token>(token));
    }

    template <typename Token>
    auto async_get(std::string const& path, Token&& token)
    {
        return async_request(http::verb::get,path, "", std::forward<Token>(token));
    }

  private:
    template <typename Token> auto async_reconnect(Token&& token)
    {
        using R = typename net::async_result<std::decay_t<Token>, void(error_code)>;
        using H = typename R::completion_handler_type;
        H handler(std::forward<Token>(token));
        R result(handler);

        assert(!_stream.has_value()); // probably a program flow bu
        _stream.emplace(_executor, _sslcontext);

        std::make_shared<ReconnectOp<H>>(this, std::move(handler))->start();

        return result.get();
    }

    template <typename Handler>
    struct ReconnectOp : std::enable_shared_from_this<ReconnectOp<Handler>> {
        ReconnectOp(Client* client, Handler h)
            : _client{client}
            , _handler(std::move(h))
            , _resolver(client->_stream->get_executor())
        {
        }

        Client*       _client;
        Handler       _handler;
        tcp::resolver _resolver;

        bool checked(error_code ec, bool complete = false) {
            if (complete || ec)
                std::move(_handler)(ec);
            if (ec && _client->_stream.has_value())
            {
                std::cerr << "Socket " << _client->_stream->native_handle()
                          << " closed due to " << ec.message() << std::endl;
                _client->_stream.reset();
            }

            return !ec.failed();
        }

        void start()
        {
            _resolver.async_resolve(
                _client->_spec.hostname, _client->_spec.port,
                beast::bind_front_handler(&ReconnectOp::on_resolved,
                                          this->shared_from_this()));
        }

        void on_resolved(error_code ec, tcp::resolver::results_type ep)
        {
            if (checked(ec)) {
                beast::get_lowest_layer(*_client->_stream)
                    .async_connect(
                        ep,
                        beast::bind_front_handler(&ReconnectOp::on_connected,
                                                  this->shared_from_this()));
            }
        }

        void on_connected(error_code ec, tcp::endpoint ep) {
            if (checked(ec)) {
                std::cerr << "Socket " << _client->_stream->native_handle()
                          << " (re)connected to " << ep << std::endl;

                auto& hostname = _client->_spec.hostname;
                SSL_set_tlsext_host_name(_client->_stream->native_handle(),
                                         hostname.c_str());

                _client->_stream->async_handshake(
                    Stream::client,
                    beast::bind_front_handler(&ReconnectOp::on_ready,
                                              this->shared_from_this()));
            }
        }

        void on_ready(error_code ec) {
            checked(ec, true);
        }
    };

    struct IAsyncTask {
        virtual void initiate() = 0;
    };

    template <typename Handler>
    struct RequestOp : IAsyncTask, std::enable_shared_from_this<RequestOp<Handler>> {
        RequestOp(Client* client, http::verb verb, std::string const& path,
                  std::string data, Handler h)
            : _client(client)
            , _handler(std::move(h))
            , _request(verb, path, 11, std::move(data))
        {
            _request.set(http::field::host,         _client->_spec.hostname);
            _request.set(http::field::connection,   "keep-alive");
            _request.set(http::field::user_agent,   BOOST_BEAST_VERSION_STRING);
            _request.set(http::field::content_type, "application/json");
            _request.set(http::field::accept,       "application/json");
            _request.prepare_payload();
        }

        Client*                           _client;
        Handler                           _handler;
        http::request<http::string_body>  _request;
        http::response<http::string_body> _response;
        beast::flat_buffer                _buffer;
        size_t                            _bandwidth = 0;
        clk::time_point                   _start = clk::now();

        bool checked(error_code ec, bool complete = false) {
            if (complete || ec)
                std::move(_handler)(ec, std::move(_response.body()));
            if (ec)
                _client->_stream.reset();

            return !ec.failed();
        }

        void initiate() override
        {
            if (!_client->_stream.has_value()) {
                _client->async_reconnect(beast::bind_front_handler(
                    &RequestOp::on_connected, this->shared_from_this()));
            } else {
                on_connected(error_code{});
            }
        }

        void on_connected(error_code ec) {
            _start = clk::now(); // This matches the start of measurements in
                                 // the original, synchronous code
            http::async_write(*_client->_stream, _request,
                              beast::bind_front_handler(
                                  &RequestOp::on_sent, this->shared_from_this()));
        }

        void on_sent(error_code ec, size_t transferred) {
            _bandwidth += transferred; // measuring actual bytes including HTTP headers

            if (checked(ec)) {
                http::async_read(
                    *_client->_stream, _buffer, _response,
                    beast::bind_front_handler(&RequestOp::on_response,
                                              this->shared_from_this()));
            }
        }

        void on_response(error_code ec, size_t transferred) {
            _bandwidth += transferred; // measuring actual bytes including HTTP headers

            std::lock_guard lk(timeBytesMutex);
            timeBytes.push_back({(clk::now() - _start) / 1.0ms, _bandwidth});

            checked(ec, true);
        }
    };

  private:
    net::any_io_executor _executor;
    Url::Spec            _spec;
    ssl::context&        _sslcontext;

    using Stream = beast::ssl_stream<beast::tcp_stream>;
    std::optional<Stream> _stream; // nullopt when disconnected

    // task queueing
    using AsyncTask = std::shared_ptr<IAsyncTask>;
    std::deque<AsyncTask> _tasks;

    void enqueue(AsyncTask task) {
        post(_executor,
             [=, t = std::move(task), this, self = shared_from_this()] {
                 _tasks.push_back(std::move(t));
                 if (_tasks.size() == 1) {
                     _tasks.front()->initiate();
                 }
             });
    }
};

int main()
{
    ssl::context ctx(ssl::context::tlsv12_client);
    ctx.set_verify_mode(ssl::verify_peer);
    ctx.set_default_verify_paths();
    // load_root_certificates(ctx);

    net::thread_pool io(1);
    std::map<Url::Spec, std::shared_ptr<Client> > pool;

    using V = http::verb;
    for (auto [url, verb, data] : {
     std::tuple //
     {"https://httpbin.org/post",                        V::post,    "post data"},
     {"https://httpbin.org/delay/5",                     V::delete_, ""},
     {"https://httpbin.org/base64/ZGVjb2RlZCBiYXM2NA==", V::get,     ""},
     {"https://httpbin.org/delay/7",                     V::patch,   ""},
     {"https://httpbin.org/stream/3",                    V::get,     ""},
     {"https://httpbin.org/uuid",                        V::get,     ""},
    }) //
    {
        auto parsed = parseUrl(url);
        std::cout << std::quoted(parsed.protocol) << " "
                  << std::quoted(parsed.hostname) << " "
                  << std::quoted(parsed.port) << " "
                  << std::quoted(parsed.path) << "
";

        auto spec = parsed.specification();

        if (!pool.contains(spec)) {
            pool.emplace(spec,
                         std::make_shared<Client>(
                             make_strand(io.get_executor()), spec, ctx));
        }

        pool.at(spec)->async_request(
            verb, parsed.path, data,
            [=, v = verb, u = url](error_code ec, std::string const& body) {
                std::cout << v << " to " << u << ": " << std::quoted(body)
                          << std::endl;
            });
    }

    io.join();

    for (auto& [time, bytes] : timeBytes) {
        std::cout << bytes << " bytes in " << time << "ms
";
    }
}

在我的系统上打印

"https" "httpbin.org" "443" "/post"
"https" "httpbin.org" "443" "/delay/5"
"https" "httpbin.org" "443" "/base64/ZGVjb2RlZCBiYXM2NA=="
"https" "httpbin.org" "443" "/delay/7"
"https" "httpbin.org" "443" "/stream/3"
"https" "httpbin.org" "443" "/uuid"
Socket 0x7f4ad4001060 (re)connected to 18.232.227.86:443
POST to https://httpbin.org/post: "{
  "args": {}, 
  "data": "post data", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "application/json", 
    "Content-Length": "9", 
    "Content-Type": "application/json", 
    "Host": "httpbin.org", 
    "User-Agent": "Boost.Beast/318", 
    "X-Amzn-Trace-Id": "Root=1-618b513c-2c51c112061b10456a5e3d4e"
  }, 
  "json": null, 
  "origin": "163.158.244.77", 
  "url": "https://httpbin.org/post"
}
"
DELETE to https://httpbin.org/delay/5: "{
  "args": {}, 
  "data": "", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "application/json", 
    "Content-Type": "application/json", 
    "Host": "httpbin.org", 
    "User-Agent": "Boost.Beast/318", 
    "X-Amzn-Trace-Id": "Root=1-618b513c-324c97504eb79d8b743c6c5d"
  }, 
  "origin": "163.158.244.77", 
  "url": "https://httpbin.org/delay/5"
}
"
GET to https://httpbin.org/base64/ZGVjb2RlZCBiYXM2NA==: "decoded bas64"
PATCH to https://httpbin.org/delay/7: "{
  "args": {}, 
  "data": "", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "application/json", 
    "Content-Type": "application/json", 
    "Host": "httpbin.org", 
    "User-Agent": "Boost.Beast/318", 
    "X-Amzn-Trace-Id": "Root=1-618b5141-3a8c30e60562df583061fc5a"
  }, 
  "origin": "163.158.244.77", 
  "url": "https://httpbin.org/delay/7"
}
"
GET to https://httpbin.org/stream/3: "{"url": "https://httpbin.org/stream/3", "args": {}, "headers": {"Host": "httpbin.org", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 0}
{"url": "https://httpbin.org/stream/3", "args": {}, "headers": {"Host": "httpbin.org", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 1}
{"url": "https://httpbin.org/stream/3", "args": {}, "headers": {"Host": "httpbin.org", "X-Amzn-Trace-Id": "Root=1-618b5148-45fce8a8432930a006c0a574", "User-Agent": "Boost.Beast/318", "Content-Type": "application/json", "Accept": "application/json"}, "origin": "163.158.244.77", "id": 2}
"
GET to https://httpbin.org/uuid: "{
  "uuid": "4557c909-880e-456c-8ef9-049a72f5fda1"
}
"
826 bytes in 84.9807ms
752 bytes in 5267.26ms
425 bytes in 84.6031ms
751 bytes in 7085.28ms
1280 bytes in 86.6554ms
434 bytes in 85.0086ms

注意:

  • Httpbin.org有各种测试URL-其中一些会产生长时间延迟,因此会出现计时

  • 只有一个连接。如果出现IO错误,我们将断开连接(下一次请求时应重新连接)

  • HTTP错误不是错误,因为连接保持有效

  • DNS解析、连接和握手也是异步

相关文章