文档链接 https://think-async.com/Asio/asio-1.22.1/doc/asio/overview.html
asio基本分析
io_context
是作为调用操作系统接口的中间层
同步模式
当发生错误的时候,如果没有指定error_code
,则会抛出异常。 false作为没有异常出现
异步模式
调用的异步请求API,会产生一个task。
当task对应的操作结束后,task会被放入队列中,等待io_context::run()
读取队列中的task进行分发。
异步模型
异步操作
同步操作的返回值与调用的函数及其参数有关。
异步操作handler参数的类型和顺序与初始化函数(请求异步操作的函数 如async_write)及其参数有关。
同步操作需要临时资源(内存、文件描述符和线程)时,在从函数返回前临时资源会被释放。
异步操作需要临时资源时,在调用结束handler前被释放,便于可能产生的重复调用。
异步代理
异步代理是异步操作的顺序组成。每个异步操作被认为是一个异步代理的一部分,即使异步代理中只有一个异步操作。
一个异步代理可能和其他代理并发执行,异步代理将异步操作看做是线程去同步操作。
调用初始化函数后,产生异步操作,handler被调用。handler中再次调用初始化函数,产生异步操作,依次循环下去称为异步代理。
Associated Characteristics and Associators
异步代理的组成
- allocator:决定代理中的异步操作如何获得内存资源
- cancellation slot:决定代理中的异步操作如何支持取消
- executor:决定代理的handler将如何进入队列,和被运行
异步操作将通过查询上述的内容,用于满足自己的需要和偏好。查询则是通过associator traits完成。
associator traits可以通过具体的handler类型定制
异步操作通过associator traits完成如下计算:(s为S类型的值,c为C类型的值)
- 通过
associated_R<S, C>::type
获取type - 通过
associated_R<S, C>::get(s, c)
获取value
为了方便也可以通过如下的方式获取对应内容
associated_R_t<S, C>
get_associated_R(s, c)
子代理
子代理为多个嵌套的异步操作
异步代理可以由子代理构成,子代理之间切换时要注意上一个子代理的资源是否已经释放。
同步操作可以在同一个线程上简单的调用子函数,子函数功能不会发生改变。
异步代理通过不断的分享父代理的Characteristics,实现功能不会发生改变。
在分享Characteristics时,异步操作可能有选择性的分享父代理的Characteristics。
Executor
每个异步代理都有一个与之关联的executor,其决定代理的handler将如何进入队列,和被运行
executor的功能如下
- 当handler中会处理共享资源时,协调不同的异步代理,保证每次只有一个代理执行handler。
- 保证代理运行在指定的资源上(如CPU)。
- 在GUI更新线程上,排列handler,保证他们可以安全的更新用户界面元素。
- 可以控制在执行handler前后,去执行指定的代码(如记录机制,用户校验等)。
- 指定异步代理和handler的优先级。
所以executor决定了异步代理何时、何地和如何运行。
Allocators - 未完
每个异步代理都有一个与之关联的allocator。allocator是用于获取per-operation stable memory resources (POSMs).
内存是以每次操作为单位分配的。
异步操作将会在如下途径利用POSMS
- 操作不需要POSMS。如操作所包装的现有API,具有其自己的内存管理。或是将状态保存到现有的内存中,如现有的buffer。
- 操作使用单个固定大小的POSM。如操作会将一些状态存储在链表中。
- 操作使用单个动态大小的POSM。如操作存储用户buffer或动态结构体的拷贝。
- 操作使用单个POSMS。如在链表中使用固定大小的POSM,在buffer中使用动态大小的POSM。
https://think-async.com/Asio/asio-1.22.1/doc/asio/overview/model/allocators.html
https://think-async.com/Asio/asio-1.22.1/doc/asio/overview/core/allocation.html 定制allocation
Cancellation
可以为异步代理指定cancel handler,当代理收到取消信号时cancel handler会被调用。cancel handler是以代理为单位设置的。
用途:当一个子代理完成时,可能就不需要另一个子代理,所以可以将另一个子代理取消。
https://think-async.com/Asio/asio-1.22.1/doc/asio/overview/core/cancellation.html 定制cancel
Completion Tokens
下面的例子中async_read_some的最后一个参数即为Completion Token。
// lambda将在操作完成后被调用。同时会将结果传递进来
socket.async_read_some(buffer,
[](error_code e, size_t)
{
// ...
}
);
// 初始化函数不仅执行read操作,还返回了future
future<size_t> f =
socket.async_read_some(
buffer, use_future
);
// ...
size_t n = f.get();
// 这里初始化函数不会立即执行read操作,只是会返回一个awaitable等待co_await时才执行操作。
awaitable<void> foo()
{
size_t n =
co_await socket.async_read_some(
buffer, use_awaitable
);
// ...
}
// yield_context会让初始化函数表现为同步操作。初始化函数不仅会立即执行read操作,还会在结束前阻塞有栈协程
// 从有栈协程的角度看,操作是同步的。
void foo(asio::yield_context yield)
{
size_t n = socket.async_read_some(buffer, yield);
// ...
}
核心概念和功能
Proactor
线程与ASIO
一般来说多个线程操作同一个对象是不安全的,但是诸如io_context等提供了线程安全的强力保证。
可以使用线程池调用io_context::run()
。其中的每个线程都是平等的,io_context将任务任意的分配给他们。
同时可以使用post向io_context中发送任务。
使用无锁编程
Buffer
ASIO中定义了如下两种缓冲区。分为可变和不可变buffer。实际实现使用的class而非pair。
typedef std::pair<void*, std::size_t> mutable_buffer;
typedef std::pair<const void*, std::size_t> const_buffer;
基于流的缓冲区
通过data访问const输入流,prepare访问mutable输出流。
使用commit将指定大小的数据从输出流首部移动到输入流尾部。
使用consume从输入流首部移除数据。
访问缓冲序列
asio::streambuf sb;
...
std::size_t n = asio::read_until(sock, sb, '\n');
asio::streambuf::const_buffers_type bufs = sb.data();
std::string line(
asio::buffers_begin(bufs),
asio::buffers_begin(bufs) + n);
流 简短的读取和写入
read_some
async_read_some
write_some
async_write_some
Reactor风格操作
ip::tcp::socket socket(my_io_context);
socket.non_blocking(true);
socket.async_wait(ip::tcp::socket::wait_read, read_handler);
// 可读时会调用
void read_handler(asio::error_code ec)
{
if (!ec)
{
std::vector<char> buf(socket.available());
socket.read_some(buffer(buf));
}
}
基于行的操作。
class http_connection
{
...
void start()
{
// 读取到\r\n
asio::async_read_until(socket_, data_, "\r\n",
boost::bind(&http_connection::handle_request_line, this, _1));
}
void handle_request_line(asio::error_code ec)
{
if (!ec)
{
std::string method, uri, version;
char sp1, sp2, cr, lf;
std::istream is(&data_);
is.unsetf(std::ios_base::skipws);
is >> method >> sp1 >> uri >> sp2 >> version >> cr >> lf;
...
}
}
asio::ip::tcp::socket socket_;
asio::streambuf data_;
};
定制内存分配
次序操作取消
handler跟踪
通过定义ASIO_ENABLE_HANDLER_TRACKING宏 开启。
@asio|1589424178.741850|0*1|signal_set@0x7ffee977d878.async_wait
@asio|1589424178.742593|0*2|socket@0x7ffee977d8a8.async_accept
@asio|1589424178.742619|.2|non_blocking_accept,ec=asio.system:11
@asio|1589424178.742625|0|resolver@0x7ffee977d760.cancel
@asio|1589424195.830382|.2|non_blocking_accept,ec=system:0
@asio|1589424195.830413|>2|ec=system:0
@asio|1589424195.830473|2*3|socket@0x7fa71d808230.async_receive
@asio|1589424195.830496|.3|non_blocking_recv,ec=system:0,bytes_transferred=151
@asio|1589424195.830503|2*4|socket@0x7ffee977d8a8.async_accept
@asio|1589424195.830507|.4|non_blocking_accept,ec=asio.system:11
@asio|1589424195.830510|<2|
@asio|1589424195.830529|>3|ec=system:0,bytes_transferred=151
@asio|1589424195.831143|3^5|in 'async_write' (./../../../include/asio/impl/write.hpp:330)
@asio|1589424195.831143|3*5|socket@0x7fa71d808230.async_send
@asio|1589424195.831186|.5|non_blocking_send,ec=system:0,bytes_transferred=1090
@asio|1589424195.831194|<3|
@asio|1589424195.831218|>5|ec=system:0,bytes_transferred=1090
@asio|1589424195.831263|5|socket@0x7fa71d808230.close
@asio|1589424195.831298|<5|
@asio|1589424199.793770|>1|ec=system:0,signal_number=2
@asio|1589424199.793781|1|socket@0x7ffee977d8a8.close
@asio|1589424199.793809|<1|
@asio|1589424199.793840|>4|ec=asio.system:125
@asio|1589424199.793854|<4|
@asio|1589424199.793883|0|signal_set@0x7ffee977d878.cancel
Composition and Completion Tokens
无栈协程
同步了两个异步操作
struct session : asio::coroutine
{
boost::shared_ptr<tcp::socket> socket_;
boost::shared_ptr<std::vector<char> > buffer_;
session(boost::shared_ptr<tcp::socket> socket)
: socket_(socket),
buffer_(new std::vector<char>(1024))
{
}
void operator()(asio::error_code ec = asio::error_code(), std::size_t n = 0)
{
if (!ec) reenter (this)
{
for (;;)
{
yield socket_->async_read_some(asio::buffer(*buffer_), *this);
yield asio::async_write(*socket_, asio::buffer(*buffer_, n), *this);
}
}
}
};
有栈协程
// 第一个参数可以为strand、io_context或者completion handler
asio::spawn(my_strand, do_echo);
// ...
void do_echo(asio::yield_context yield)
{
try
{
char data[128];
for (;;)
{
std::size_t length =
my_socket.async_read_some(
asio::buffer(data), yield);
asio::async_write(my_socket,
asio::buffer(data, length), yield);
}
}
catch (std::exception& e)
{
// ...
}
}
C++20协程支持
// 第一个参数可以为strand、io_context或者completion handler
asio::co_spawn(executor, echo(std::move(socket)), asio::detached);
// ...
asio::awaitable<void> echo(tcp::socket socket)
{
try
{
char data[1024];
for (;;)
{
// 指定 asio::use_awaitable 后初始化函数会返回可被co_await调用的awaitable
std::size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
co_await async_write(socket, asio::buffer(data, n), asio::use_awaitable);
}
}
catch (std::exception& e)
{
std::printf("echo Exception: %s\n", e.what());
}
}
增加异常处理
asio::awaitable<void> echo(tcp::socket socket)
{
char data[1024];
for (;;)
{
std::tuple<asio::error_code, std::size_t> result =
co_await socket.async_read_some(asio::buffer(data),
asio::experimental::as_tuple(asio::use_awaitable));
if (!std::get<0>(result))
{
// success
}
// ...
}
}
asio::awaitable<void> echo(tcp::socket socket)
{
char data[1024];
for (;;)
{
auto [ec, n] = co_await socket.async_read_some(asio::buffer(data),
asio::experimental::as_tuple(asio::use_awaitable));
if (!ec)
{
// success
}
// ...
}
}
asio::awaitable<void> echo(tcp::socket socket)
{
char data[1024];
for (;;)
{
asio::error_code ec;
std::size_t n = co_await socket.async_read_some(asio::buffer(data),
asio::redirect_error(asio::use_awaitable, ec));
if (!ec)
{
// success
}
// ...
}
}
可恢复的C++20协程
网络
TCP UDP ICMP
TCP
// 解析域名 产生endpoint
ip::tcp::resolver resolver(my_io_context);
ip::tcp::resolver::query query("www.boost.org", "http");
ip::tcp::resolver::iterator iter = resolver.resolve(query);
ip::tcp::resolver::iterator end; // End marker.
while (iter != end)
{
ip::tcp::endpoint endpoint = *iter++;
std::cout << endpoint << std::endl;
}
// 创建client
ip::tcp::socket socket(my_io_context);
socket.connect(endpoint);
// 创建server
ip::tcp::acceptor acceptor(my_io_context, my_endpoint);
ip::tcp::socket socket(my_io_context);
acceptor.accept(socket);
UDP
// 解析域名 产生endpoint
ip::udp::resolver resolver(my_io_context);
ip::udp::resolver::query query("localhost", "daytime");
ip::udp::resolver::iterator iter = resolver.resolve(query);
ip::udp::resolver::iterator end; // End marker.
while (iter != end)
{
ip::udp::endpoint endpoint = *iter++;
std::cout << endpoint << std::endl;
}
// bind to any
ip::udp::endpoint endpoint(ip::udp::v4(), 12345);
ip::udp::socket socket(my_io_context, endpoint);
code
C++20 协程
#define _WIN32_WINNT 0x0601
#include <iostream>
#include <coroutine>
#include <asio.hpp>
#include <vector>
#include <format>
struct ReadAwaitable
{
ReadAwaitable(asio::ip::tcp::socket* client_socket,
std::shared_ptr<std::vector<char>> buffer_ptr):
client_socket_(client_socket),
buffer_ptr_(buffer_ptr),
result_(0)
{
}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle)
{
client_socket_->async_read_some(asio::buffer(*buffer_ptr_),
[this, handle](const asio::error_code& ec, std::size_t n)
{
result_ = n;
handle.resume();
});
}
std::size_t await_resume()
{
return result_;
}
asio::ip::tcp::socket* client_socket_;
std::shared_ptr<std::vector<char>> buffer_ptr_;
std::size_t result_;
};
struct WriteAwaitable
{
WriteAwaitable(asio::ip::tcp::socket* client_socket,
const char* buffer_ptr, std::size_t length) :
client_socket_(client_socket),
buffer_ptr_(buffer_ptr),
length_(length)
{
}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle)
{
// async_send 不保证发送所有的数据
// async_write_some 不保证发送所有的数据
// async_write 保证不出错时 发送所有数据,否则发送n字节数据
asio::async_write(*client_socket_, asio::buffer(buffer_ptr_, length_),
[handle](const asio::error_code& ec, std::size_t n)
{
handle.resume();
});
}
void await_resume()
{
}
asio::ip::tcp::socket* client_socket_;
const char* buffer_ptr_;
std::size_t length_;
};
struct Task
{
struct promise_type
{
auto get_return_object()
{
return Task{};
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void unhandled_exception() { std::terminate(); }
void return_void() {}
};
};
Task OnNewClient(asio::ip::tcp::socket client_socket)
{
auto buffer_ptr = std::make_shared<std::vector<char>>(1024);
while (true)
{
std::size_t n = co_await ReadAwaitable(&client_socket, buffer_ptr);
if (n == 0)
{
break;
}
co_await WriteAwaitable(&client_socket, buffer_ptr->data(), n);
std::cout << std::format("Echo: {}", n) << std::endl;
}
}
int main()
{
asio::io_context context;
asio::ip::tcp::endpoint bind_addr(asio::ip::tcp::v4(), 2048);
asio::ip::tcp::acceptor acceptor(context, bind_addr);
acceptor.async_accept(
[](const asio::error_code& error, asio::ip::tcp::socket client_socket)
{
if (error)
{
std::cout << error;
}
else
{
OnNewClient(std::move(client_socket));
}
});
context.run();
return 0;
}