asio官方文档笔记

文档链接 https://think-async.com/Asio/asio-1.22.1/doc/asio/overview.html

C++20 协程支持

Reference

asio基本分析

io_context是作为调用操作系统接口的中间层

同步模式
当发生错误的时候,如果没有指定error_code,则会抛出异常。 false作为没有异常出现

异步模式
调用的异步请求API,会产生一个task。
当task对应的操作结束后,task会被放入队列中,等待io_context::run()读取队列中的task进行分发。

异步模型

异步操作

同步操作的返回值与调用的函数及其参数有关。
异步操作handler参数的类型和顺序与初始化函数(请求异步操作的函数 如async_write)及其参数有关。

同步操作需要临时资源(内存、文件描述符和线程)时,在从函数返回前临时资源会被释放。
异步操作需要临时资源时,在调用结束handler前被释放,便于可能产生的重复调用。

异步代理

异步代理是异步操作的顺序组成。每个异步操作被认为是一个异步代理的一部分,即使异步代理中只有一个异步操作。
一个异步代理可能和其他代理并发执行,异步代理将异步操作看做是线程去同步操作。

调用初始化函数后,产生异步操作,handler被调用。handler中再次调用初始化函数,产生异步操作,依次循环下去称为异步代理。

Associated Characteristics and Associators

异步代理的组成

  • allocator:决定代理中的异步操作如何获得内存资源
  • cancellation slot:决定代理中的异步操作如何支持取消
  • executor:决定代理的handler将如何进入队列,和被运行

异步操作将通过查询上述的内容,用于满足自己的需要和偏好。查询则是通过associator traits完成。

associator traits可以通过具体的handler类型定制

异步操作通过associator traits完成如下计算:(s为S类型的值,c为C类型的值)

  • 通过associated_R<S, C>::type获取type
  • 通过associated_R<S, C>::get(s, c)获取value

为了方便也可以通过如下的方式获取对应内容

  • associated_R_t<S, C>
  • get_associated_R(s, c)

子代理

子代理为多个嵌套的异步操作

异步代理可以由子代理构成,子代理之间切换时要注意上一个子代理的资源是否已经释放。

同步操作可以在同一个线程上简单的调用子函数,子函数功能不会发生改变。
异步代理通过不断的分享父代理的Characteristics,实现功能不会发生改变。

在分享Characteristics时,异步操作可能有选择性的分享父代理的Characteristics。

Executor

每个异步代理都有一个与之关联的executor,其决定代理的handler将如何进入队列,和被运行

executor的功能如下

  • 当handler中会处理共享资源时,协调不同的异步代理,保证每次只有一个代理执行handler。
  • 保证代理运行在指定的资源上(如CPU)。
  • 在GUI更新线程上,排列handler,保证他们可以安全的更新用户界面元素。
  • 可以控制在执行handler前后,去执行指定的代码(如记录机制,用户校验等)。
  • 指定异步代理和handler的优先级。

所以executor决定了异步代理何时、何地和如何运行。

Allocators - 未完

每个异步代理都有一个与之关联的allocator。allocator是用于获取per-operation stable memory resources (POSMs).

内存是以每次操作为单位分配的。

异步操作将会在如下途径利用POSMS

  • 操作不需要POSMS。如操作所包装的现有API,具有其自己的内存管理。或是将状态保存到现有的内存中,如现有的buffer。
  • 操作使用单个固定大小的POSM。如操作会将一些状态存储在链表中。
  • 操作使用单个动态大小的POSM。如操作存储用户buffer或动态结构体的拷贝。
  • 操作使用单个POSMS。如在链表中使用固定大小的POSM,在buffer中使用动态大小的POSM。

https://think-async.com/Asio/asio-1.22.1/doc/asio/overview/model/allocators.html

https://think-async.com/Asio/asio-1.22.1/doc/asio/overview/core/allocation.html 定制allocation

Cancellation

可以为异步代理指定cancel handler,当代理收到取消信号时cancel handler会被调用。cancel handler是以代理为单位设置的。

用途:当一个子代理完成时,可能就不需要另一个子代理,所以可以将另一个子代理取消。

https://think-async.com/Asio/asio-1.22.1/doc/asio/overview/core/cancellation.html 定制cancel

Completion Tokens

下面的例子中async_read_some的最后一个参数即为Completion Token。

// lambda将在操作完成后被调用。同时会将结果传递进来
socket.async_read_some(buffer,
    [](error_code e, size_t)
    {
      // ...
    }
  );


// 初始化函数不仅执行read操作,还返回了future
future<size_t> f =
  socket.async_read_some(
      buffer, use_future
    );
// ...
size_t n = f.get();


// 这里初始化函数不会立即执行read操作,只是会返回一个awaitable等待co_await时才执行操作。
awaitable<void> foo()
{
  size_t n =
    co_await socket.async_read_some(
        buffer, use_awaitable
      );
  // ...
}


// yield_context会让初始化函数表现为同步操作。初始化函数不仅会立即执行read操作,还会在结束前阻塞有栈协程
// 从有栈协程的角度看,操作是同步的。
void foo(asio::yield_context yield)
{
  size_t n = socket.async_read_some(buffer, yield);
  // ...
}

核心概念和功能

Proactor

线程与ASIO

一般来说多个线程操作同一个对象是不安全的,但是诸如io_context等提供了线程安全的强力保证。

可以使用线程池调用io_context::run()。其中的每个线程都是平等的,io_context将任务任意的分配给他们。
同时可以使用post向io_context中发送任务。

使用无锁编程

Buffer

ASIO中定义了如下两种缓冲区。分为可变和不可变buffer。实际实现使用的class而非pair。

typedef std::pair<void*, std::size_t> mutable_buffer;
typedef std::pair<const void*, std::size_t> const_buffer;

基于流的缓冲区

通过data访问const输入流,prepare访问mutable输出流。
使用commit将指定大小的数据从输出流首部移动到输入流尾部。
使用consume从输入流首部移除数据。

访问缓冲序列

asio::streambuf sb;
...
std::size_t n = asio::read_until(sock, sb, '\n');
asio::streambuf::const_buffers_type bufs = sb.data();
std::string line(
    asio::buffers_begin(bufs),
    asio::buffers_begin(bufs) + n);

流 简短的读取和写入

read_some
async_read_some
write_some
async_write_some

Reactor风格操作

ip::tcp::socket socket(my_io_context);
socket.non_blocking(true);
socket.async_wait(ip::tcp::socket::wait_read, read_handler);

// 可读时会调用
void read_handler(asio::error_code ec)
{
  if (!ec)
  {
    std::vector<char> buf(socket.available());
    socket.read_some(buffer(buf));
  }
}

基于行的操作。

class http_connection
{
  ...

  void start()
  {
    // 读取到\r\n
    asio::async_read_until(socket_, data_, "\r\n",
        boost::bind(&http_connection::handle_request_line, this, _1));
  }

  void handle_request_line(asio::error_code ec)
  {
    if (!ec)
    {
      std::string method, uri, version;
      char sp1, sp2, cr, lf;
      std::istream is(&data_);
      is.unsetf(std::ios_base::skipws);
      is >> method >> sp1 >> uri >> sp2 >> version >> cr >> lf;
      ...
    }
  }

  asio::ip::tcp::socket socket_;
  asio::streambuf data_;
};

定制内存分配

次序操作取消

handler跟踪

通过定义ASIO_ENABLE_HANDLER_TRACKING宏 开启。

@asio|1589424178.741850|0*1|signal_set@0x7ffee977d878.async_wait
@asio|1589424178.742593|0*2|socket@0x7ffee977d8a8.async_accept
@asio|1589424178.742619|.2|non_blocking_accept,ec=asio.system:11
@asio|1589424178.742625|0|resolver@0x7ffee977d760.cancel
@asio|1589424195.830382|.2|non_blocking_accept,ec=system:0
@asio|1589424195.830413|>2|ec=system:0
@asio|1589424195.830473|2*3|socket@0x7fa71d808230.async_receive
@asio|1589424195.830496|.3|non_blocking_recv,ec=system:0,bytes_transferred=151
@asio|1589424195.830503|2*4|socket@0x7ffee977d8a8.async_accept
@asio|1589424195.830507|.4|non_blocking_accept,ec=asio.system:11
@asio|1589424195.830510|<2|
@asio|1589424195.830529|>3|ec=system:0,bytes_transferred=151
@asio|1589424195.831143|3^5|in 'async_write' (./../../../include/asio/impl/write.hpp:330)
@asio|1589424195.831143|3*5|socket@0x7fa71d808230.async_send
@asio|1589424195.831186|.5|non_blocking_send,ec=system:0,bytes_transferred=1090
@asio|1589424195.831194|<3|
@asio|1589424195.831218|>5|ec=system:0,bytes_transferred=1090
@asio|1589424195.831263|5|socket@0x7fa71d808230.close
@asio|1589424195.831298|<5|
@asio|1589424199.793770|>1|ec=system:0,signal_number=2
@asio|1589424199.793781|1|socket@0x7ffee977d8a8.close
@asio|1589424199.793809|<1|
@asio|1589424199.793840|>4|ec=asio.system:125
@asio|1589424199.793854|<4|
@asio|1589424199.793883|0|signal_set@0x7ffee977d878.cancel

Composition and Completion Tokens

无栈协程

同步了两个异步操作

struct session : asio::coroutine
{
  boost::shared_ptr<tcp::socket> socket_;
  boost::shared_ptr<std::vector<char> > buffer_;

  session(boost::shared_ptr<tcp::socket> socket)
    : socket_(socket),
      buffer_(new std::vector<char>(1024))
  {
  }

  void operator()(asio::error_code ec = asio::error_code(), std::size_t n = 0)
  {
    if (!ec) reenter (this)
    {
      for (;;)
      {
        yield socket_->async_read_some(asio::buffer(*buffer_), *this);
        yield asio::async_write(*socket_, asio::buffer(*buffer_, n), *this);
      }
    }
  }
};

有栈协程

// 第一个参数可以为strand、io_context或者completion handler
asio::spawn(my_strand, do_echo);

// ...

void do_echo(asio::yield_context yield)
{
  try
  {
    char data[128];
    for (;;)
    {
      std::size_t length =
        my_socket.async_read_some(
          asio::buffer(data), yield);

      asio::async_write(my_socket,
          asio::buffer(data, length), yield);
    }
  }
  catch (std::exception& e)
  {
    // ...
  }
}

C++20协程支持

// 第一个参数可以为strand、io_context或者completion handler
asio::co_spawn(executor, echo(std::move(socket)), asio::detached);

// ...

asio::awaitable<void> echo(tcp::socket socket)
{
  try
  {
    char data[1024];
    for (;;)
    {
      // 指定 asio::use_awaitable 后初始化函数会返回可被co_await调用的awaitable
      std::size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
      co_await async_write(socket, asio::buffer(data, n), asio::use_awaitable);
    }
  }
  catch (std::exception& e)
  {
    std::printf("echo Exception: %s\n", e.what());
  }
}

增加异常处理

asio::awaitable<void> echo(tcp::socket socket)
{
  char data[1024];
  for (;;)
  {
    std::tuple<asio::error_code, std::size_t> result =
      co_await socket.async_read_some(asio::buffer(data),
        asio::experimental::as_tuple(asio::use_awaitable));
    if (!std::get<0>(result))
    {
      // success
    }

    // ...
  }
}

asio::awaitable<void> echo(tcp::socket socket)
{
  char data[1024];
  for (;;)
  {
    auto [ec, n] = co_await socket.async_read_some(asio::buffer(data),
        asio::experimental::as_tuple(asio::use_awaitable));
    if (!ec)
    {
      // success
    }

    // ...
  }
}

asio::awaitable<void> echo(tcp::socket socket)
{
  char data[1024];
  for (;;)
  {
    asio::error_code ec;
    std::size_t n = co_await socket.async_read_some(asio::buffer(data),
        asio::redirect_error(asio::use_awaitable, ec));
    if (!ec)
    {
      // success
    }

    // ...
  }
}

可恢复的C++20协程

网络

TCP UDP ICMP

TCP

// 解析域名 产生endpoint
ip::tcp::resolver resolver(my_io_context);
ip::tcp::resolver::query query("www.boost.org", "http");
ip::tcp::resolver::iterator iter = resolver.resolve(query);
ip::tcp::resolver::iterator end; // End marker.
while (iter != end)
{
  ip::tcp::endpoint endpoint = *iter++;
  std::cout << endpoint << std::endl;
}

// 创建client
ip::tcp::socket socket(my_io_context);
socket.connect(endpoint);

// 创建server
ip::tcp::acceptor acceptor(my_io_context, my_endpoint);
ip::tcp::socket socket(my_io_context);
acceptor.accept(socket);

UDP

// 解析域名 产生endpoint
ip::udp::resolver resolver(my_io_context);
ip::udp::resolver::query query("localhost", "daytime");
ip::udp::resolver::iterator iter = resolver.resolve(query);
ip::udp::resolver::iterator end; // End marker.
while (iter != end)
{
  ip::udp::endpoint endpoint = *iter++;
  std::cout << endpoint << std::endl;
}

// bind to any
ip::udp::endpoint endpoint(ip::udp::v4(), 12345);
ip::udp::socket socket(my_io_context, endpoint);

code

C++20 协程

#define _WIN32_WINNT 0x0601

#include <iostream>
#include <coroutine>
#include <asio.hpp>
#include <vector>
#include <format>

struct ReadAwaitable
{
    ReadAwaitable(asio::ip::tcp::socket* client_socket,
        std::shared_ptr<std::vector<char>> buffer_ptr):
        client_socket_(client_socket),
        buffer_ptr_(buffer_ptr),
        result_(0)
    {

    }
    bool await_ready() const { return false; }
    void await_suspend(std::coroutine_handle<> handle)
    {
        client_socket_->async_read_some(asio::buffer(*buffer_ptr_),
            [this, handle](const asio::error_code& ec, std::size_t n)
            {
                result_ = n;
                handle.resume();
            });
    }
    std::size_t await_resume()
    {
        return result_;
    }

    asio::ip::tcp::socket* client_socket_;
    std::shared_ptr<std::vector<char>> buffer_ptr_;
    std::size_t result_;
};

struct WriteAwaitable
{
    WriteAwaitable(asio::ip::tcp::socket* client_socket,
        const char* buffer_ptr, std::size_t length) :
        client_socket_(client_socket),
        buffer_ptr_(buffer_ptr),
        length_(length)
    {

    }
    bool await_ready() const { return false; }
    void await_suspend(std::coroutine_handle<> handle)
    {
        // async_send 不保证发送所有的数据
        // async_write_some 不保证发送所有的数据
    // async_write 保证不出错时 发送所有数据,否则发送n字节数据
        asio::async_write(*client_socket_, asio::buffer(buffer_ptr_, length_),
            [handle](const asio::error_code& ec, std::size_t n)
            {
                handle.resume();
            });
    }
    void await_resume()
    {
    }

    asio::ip::tcp::socket* client_socket_;
    const char* buffer_ptr_;
    std::size_t length_;
};

struct Task
{
    struct promise_type
    {
        auto get_return_object()
        { 
            return Task{};
        }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void unhandled_exception() { std::terminate(); }
        void return_void() {}
    };
};

Task OnNewClient(asio::ip::tcp::socket client_socket)
{
    auto buffer_ptr = std::make_shared<std::vector<char>>(1024);
    while (true)
    {
        std::size_t n = co_await ReadAwaitable(&client_socket, buffer_ptr);
        if (n == 0)
        {
            break;
        }
        co_await WriteAwaitable(&client_socket, buffer_ptr->data(), n);
        std::cout << std::format("Echo: {}", n) << std::endl;
    }
}

int main()
{
    asio::io_context context;
    asio::ip::tcp::endpoint bind_addr(asio::ip::tcp::v4(), 2048);
    asio::ip::tcp::acceptor acceptor(context, bind_addr);


    acceptor.async_accept(
        [](const asio::error_code& error, asio::ip::tcp::socket client_socket)
        {
            if (error)
            {
                std::cout << error;
            }
            else
            {
                OnNewClient(std::move(client_socket));
            }
        });

    context.run();
    return 0;
}