异步 IO 的整洁代码

2022-01-04 00:00:00 asynchronous c io c++

虽然异步 IO(带有 select/poll/epoll/kqueue 等的非阻塞描述符)并不是网络上记录最多的东西,但有一些很好的例子.

Whilst asynchronous IO (non-blocking descriptors with select/poll/epoll/kqueue etc) is not the most documented thing on the web, there are a handful of good examples.

然而,所有这些例子,在确定了调用返回的句柄后,只有一个do_some_io(fd)"存根.他们并没有真正解释如何在这种方法中最好地处理实际的异步 IO.

However, all these examples, having determined the handles that are returned by the call, just have a 'do_some_io(fd)' stub. They don't really explain how to best approach the actual asynchronous IO in such a method.

Blocking IO 非常整洁且易于阅读代码.另一方面,非阻塞的异步 IO 是多毛和凌乱的.

Blocking IO is very tidy and straightforward to read code. Non-blocking, async IO is, on the other hand, hairy and messy.

有哪些方法?什么是健壮和可读的?

What approaches are there? What are robust and readable?

void do_some_io(int fd) {
  switch(state) {
    case STEP1:
       ... async calls
       if(io_would_block)
          return;
       state = STEP2;
    case STEP2:
       ... more async calls
       if(io_would_block)
          return;
       state = STEP3;
    case STEP3:
       ...
  }
}

或者(ab)使用 GCC 的计算得到的 goto:

or perhaps (ab)using GCC's computed gotos:

#define concatentate(x,y) x##y
#define async_read_xx(var,bytes,line)       
   concatentate(jmp,line):                  
   if(!do_async_read(bytes,&var)) {         
       schedule(EPOLLIN);                   
       jmp_read = &&concatentate(jmp,line); 
       return;                              
}

// macros for making async code read like sync code
#define async_read(var,bytes) 
    async_read_xx(var,bytes,__LINE__)

#define async_resume()            
     if(jmp_read) {               
         void* target = jmp_read; 
         jmp_read = NULL;         
         goto *target;            
     }

void do_some_io() {
   async_resume();
   async_read(something,sizeof(something));
   async_read(something_else,sizeof(something_else));
}

或者可能是 C++ 异常和状态机,因此工作函数可以触发中止/恢复位,或者可能是表驱动的状态机?

Or perhaps C++ exceptions and a state machine, so worker functions can trigger the abort/resume bit, or perhaps a table-driven state-machine?

我追求的不是如何让它工作,而是如何使它可维护!

Its not how to make it work, its how to make it maintainable that I'm chasing!

推荐答案

我建议看看:http://www.kegel.com/c10k.html,第二次看看现有的库,比如 libevent、Boost.Asio 已经完成了这项工作,看看它们是如何工作的.

I suggest take a look on: http://www.kegel.com/c10k.html, second take a look on existing libraries like libevent, Boost.Asio that already do the job and see how they work.

关键是每种类型的系统调用的方法可能不同:

The point is that the approach may be different for each type of system call:

  • select 是一个简单的反应器
  • epoll 有边缘或水平触发接口,需要不同的方法
  • iocp 是 proactor 需要其他方法

建议:使用良好的现有库,例如用于 C++ 的 Boost.Asio 或用于 C 的 libevent.

Suggestion: use good existing library like Boost.Asio for C++ or libevent for C.

这是 ASIO 的处理方式

This is how ASIO handles this

class connection {
   boost::asio:ip::tcp::socket socket_;
public:
   void run()
   {
         // for variable length chunks
         async_read_until(socket_,resizable_buffer,'
',
               boost::bind(&run::on_line_recieved,this,errorplacehplder);
         // or constant length chunks
         async_read(socket_,buffer(some_buf,buf_size),
               boost::bind(&run::on_line_recieved,this,errorplacehplder);
   }
   void on_line_recieved(error e)
   {
        // handle it
        run();
   }

};

因为 ASIO 作为 proactor 它会在操作完成时通知您并且在内部处理 EWOULDBLOCK.

Because ASIO works as proactor it notifies you when operation is complete and handles EWOULDBLOCK internally.

如果你说反应堆,你可以模拟这种行为:

If you word as reactor you may simulate this behavior:

 class conn {
    // Application logic

    void run() {
       read_chunk(&conn::on_chunk_read,size);
    }
    void on_chunk_read() {
         /* do something;*/
    }

    // Proactor wrappers

    void read_chunk(void (conn::*callback),int size, int start_point=0) {
       read(socket,buffer+start,size)
       if( complete )
          (this->*callback()
       else {
          this -> tmp_size-=size-read;
          this -> tmp_start=start+read;
          this -> tmp_callback=callback
          your_event_library_register_op_on_readable(callback,socket,this);
       }
    }
    void callback()
    {
       read_chunk(tmp_callback,tmp_size,tmp_start);
    }
 }

类似的东西.

相关文章