并发程序中的 I/O

2021-12-28 00:00:00 io concurrency c++

我正在开发一个并发程序;它有两个线程,一个监听来自服务器的消息,另一个向它发送消息.我需要从用户那里获取命令(使用 cin?)并同时显示来自服务器的消息.

I'm working on a concurrent program; it has two threads, one of which listens messages from a server and the other one sends messages to it. I need to obtain commands from the user (using cin?) and show messages coming from the server both at the same time.

我该如何处理这种情况?问题是,如果我在收到消息时读取用户的命令,则用户的输入会与其他内容混淆.

How can I handle that situation? The problem is that if I'm reading a command from the user when a message comes, the user's input is messed up with other stuff.

提前致谢

推荐答案

我拿了我的旧示例代码并试图把它变成一个 MCVE.(最小"并不一定意味着短",是吗?)

I took my old sample code and tried to turn it into an MCVE. ("Minimal" does not necessarily mean "short", does it?)

这是一个非常简单的shell"概念,它支持一个线程进行输入,而多个线程可以进行输出.

This is a very simple concept of a "shell" which supports one thread for input while multiple threads may do output.

  1. 键盘输入无回声.这是不可移植的.因此我提供了两个函数 getChar() 的实现 –一个用于 MS Windows,另一个用于非 MS Windows(实际上只考虑 *ix 操作系统).后者受到 SO:如何在 Linux 中实现 C 的 getch() 函数的强烈启发"?.

  1. The keyboard input is done non-echoing. This is non-portable. Therefore I provide two implementations of function getChar() – one for MS Windows and another for non-MS Windows (which considers actually only *ix OSes). The latter is "strongly inspired" by SO: How to implement getch() function of C in Linux?.

输入的字符存储在std::string中.

输出擦除提示和当前输入文本(重复 " " 的输出),打印输出文本(包括换行符),并打印再次提示和当前输入缓冲区.

The output erases the prompt and the current input text (repeating the output of " " resp.), prints the output text (incl. newline), and prints the prompt and current input buffer again.

输出受互斥锁保护以授予线程安全性.

The output is mutex guarded to grant thread-safety.

这是示例代码miniShell.cc:

// system header:
#ifdef _WIN32
#include <conio.h>
#else // (not) _WIN32
#include <termios.h>
#include <unistd.h>
#include <stdio.h>
#endif // _WIN32

/// reads a character from console without echo.
#ifdef _WIN32
inline int getChar() { return _getch(); }
#else // (not) _WIN32
int getChar()
{
  struct termios oldattr;
  tcgetattr(STDIN_FILENO, &oldattr);
  struct termios newattr = oldattr;
  newattr.c_lflag &= ~(ICANON | ECHO);
  tcsetattr(STDIN_FILENO, TCSANOW, &newattr);
  const int ch = getchar();
  tcsetattr(STDIN_FILENO, TCSANOW, &oldattr);
  return ch;
}
#endif // _WIN32

// standard C/C++ header:
#include <cstring>
#include <mutex>
#include <string>

/* provides a class for a simple thread-safe mini-shell.
 *
 * It is assumed that one thread may await user input (read()) while
 * another thread may (or may not) output text from time to time.
 * The mini-shell grants that the input line is always the last line.
 */
class Console {

  // variable:
  private:
    // mutex for console I/O
    std::mutex _mtx;
    // current input
    std::string _input;
    // prompt output
    std::string _prompt;

  // methods:
  public:
    /// constructor.
    Console() { }

    // disabled:
    Console(const Console&) = delete;
    Console& operator = (const Console&) = delete;

    // reads a line from console and returns input string
    std::string read();

    /* writes text to console.
     *
     * text the text
     * size size of text
     */
    void write(const char *text, size_t size);
    void write(const char *text) { write(text, strlen(text)); }
    void write(const std::string &text) { write(text.c_str(), text.size()); }
};

// standard C/C++ header:
#include <atomic>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>

std::string Console::read()
{
  { // activate prompt
    std::lock_guard<std::mutex> lock(_mtx);
    _prompt = "> "; _input.clear();
    std::cout << _prompt << std::flush;
  }
#ifdef _WIN32
  enum { Enter = '', BackSpc = '' };
#else // (not) _WIN32
  enum { Enter = '
', BackSpc = 127 };
#endif // _WIN32
  // input loop
  for (;;) {
    switch (int c = getChar()) {
      case Enter: {
        std::lock_guard<std::mutex> lock(_mtx);
        std::string input = _input;
        _prompt.clear(); _input.clear();
        std::cout << std::endl;
        return input;
      } // unreachable: break;
      case BackSpc: {
        std::lock_guard<std::mutex> lock(_mtx);
        if (_input.empty()) break; // nothing to do
        _input.pop_back();
        std::cout << " " << std::flush;
      } break;
      default: {
        if (c < ' ' || c >= 'x7f') break;
        std::lock_guard<std::mutex> lock(_mtx);
        _input += c;
        std::cout << (char)c << std::flush;
      } break;
    }
  }
}

void Console::write(const char *text, size_t len)
{
  if (!len) return; // nothing to do
  bool eol = text[len - 1] == '
';
  std::lock_guard<std::mutex> lock(_mtx);
  // remove current input echo
  if (size_t size = _prompt.size() + _input.size()) {
    std::cout
      << std::setfill('') << std::setw(size) << ""
      << std::setfill(' ') << std::setw(size) << ""
      << std::setfill('') << std::setw(size) << "";
  }
  // print text
  std::cout << text;
  if (!eol) std::cout << std::endl;
  // print current input echo
  std::cout << _prompt << _input << std::flush;
}

// a sample application

// shared data for main thread and data processing thread
struct Shared {
  // flag: true ... exit communication thread and main loop
  std::atomic<bool> exit;
  // flag: true ... start data processing
  std::atomic<bool> start;
  // the mini console
  Console console;

  // constructor.
  Shared(): exit(false), start(true) { }
};

void dataProc(Shared &shared)
{
  while (!shared.exit) {
    // "busy" wait for start (condition would be more elegant)
    while (!shared.start) {
      if (shared.exit) return;
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    // do data processing
    shared.console.write("Starting data processing.");
    for (int i = 0, n = 20; i < n; ++i) {
      // "busy" wait for start (condition would be more elegant)
      if (!shared.start) {
        shared.console.write("Data processing stopped.");
        while (!shared.start) {
          if (shared.exit) return;
          std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        shared.console.write("Data processing restarted.");
      }
      // consume some time (to simulate heavy computation)
      std::this_thread::sleep_for(std::chrono::milliseconds(250));
      // do some console output about progress
      { std::ostringstream fmt;
        fmt << "Step " << i + 1 << '/' << n;
        shared.console.write(fmt.str());
      }
    }
    shared.console.write("Data processing done.");
    shared.start = false;
  }
}

void processInput(const std::string &input, Shared &shared)
{
  if (input == "start") shared.start = true;
  else if (input == "stop") shared.start = false;
  else if (input == "exit") shared.exit = true;
  else if (input.size()) shared.console.write("Wrong command!");
}

int main()
{
  Shared shared;
  // start a thread for some kind of data processing
  std::thread threadDataProc(&dataProc, std::ref(shared));
  // main loop
  while (!shared.exit) {
    shared.console.write("Commands: start stop exit");
    std::string input = shared.console.read();
    processInput(input, shared);
  }
  // join data processing thread
  threadDataProc.join();
  // done
  return 0;
}

我在 VS2013 和 Windows 10 上的 cygwin 的 bash/Xterm 中编译和测试.(cygwin 是我手头上最接近 Linux 的.)

I compiled and tested in VS2013 vs. bash/Xterm of cygwin on Windows 10. (cygwin was the closest to Linux I have at hand.)

请记住,在我编写这段代码时,简单比完美或舒适更重要.

Please, keep in mind that I wrote this code whereby simplicity was more important than perfection or comfort.

相关文章