在C++11中标准库引入了std::thread, 支持多线程编程. 但是std::thread不是一个RAII对象, 也就是在对象销毁之前, 需要手动调用join或者detach方法. 如果忘记了调用, 程序就会崩溃. 为了解决这个问题, C++20引入了std::jthread类, 它是一个RAII对象, 在对象销毁时, 会自动调用join方法. 引入std::jthread而不修改std::thread的原因是为了向后兼容.

std::jthread同时还引入了其他特性, 比如request_stop方法, 用于请求线程停止. 本文将介绍这些新特性.

创建jthread对象

std::jthread无需手动调用join或者detach方法, 在对象销毁时, 会自动调用join方法. 以下是创建std::jthread对象的例子:

#include <iostream>
#include <thread>

int main() {
  for (unsigned i = 0; i < std::jthread::hardware_concurrency(); i++) {
    std::jthread j(
        [i] { std::cout << "Hello from thread[" << i << "]" << std::endl; });
  }
}

请求线程停止

std::jthread引入了request_stop方法, 用于请求线程停止. 以下是一个例子:

#include <iostream>
#include <thread>

void task(const std::stop_token& st) {
  while (!st.stop_requested()) {
    std::cout << "Task running" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
  std::cout << "Task stopped" << std::endl;
}

int main() {
  std::jthread j(task);
  std::this_thread::sleep_for(std::chrono::seconds(2));
  j.request_stop();
  return 0;
}

程序输出:

Task running
Task running
Task stopped

这个输出并非固定, 也会出现Task running输出三次的情况. 具体输出取决于线程调度.

线程执行的函数 void task(const std::stop_token& st) 接受一个std::stop_token参数, 用于检查是否请求停止. 在main函数中, 等待2秒后, 调用j.request_stop()方法请求线程停止. 这个方法适用于线程函数中有循环的情况.

线程停止回调

std::jthread 支持线程停止回调std::stop_callback, 该函数会在线程停止时执行. 以下是一个例子:

#include <iostream>
#include <thread>

void task_with_callback(const std::stop_token& st) {
  std::stop_callback cb(st, [] { std::cout << "Task2 exited" << std::endl; });

  while (!st.stop_requested()) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
}

int main() {
  std::jthread j(task_with_callback);
  j.request_stop();
  return 0;
}

程序输出:

Task2 exited

task_with_callback函数中, 创建了一个std::stop_callback对象, 用于在线程停止时执行. 在main函数中, 调用j.request_stop()方法请求线程停止, 然后线程停止回调函数被执行.

在条件变量中使用request_stop

设想一下这个场景, 在生产者-消费者模型中, 消费者block在条件变量的wait方法中, 等待生产者通知. 这个时候发送停止请求, 会导致消费者线程无法停止. 以下是一个例子:

#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

int main() {
  std::queue<int> queue;
  std::mutex mtx;
  std::condition_variable cv;

  std::jthread consumer{[&](const std::stop_token& st) {
    while (!st.stop_requested()) {
      std::unique_lock<std::mutex> lock(mtx);
      cv.wait(lock, [&queue] { return !queue.empty(); });
      while (!queue.empty()) {
        int item = queue.front();
        queue.pop();
        std::cout << "Consumed: " << item << std::endl;
      }
    }
    std::cout << "Consumer stopped" << std::endl;
  }};

  // 等待1s确保消费者线程已经准备好
  std::this_thread::sleep_for(std::chrono::seconds(1));
  // 此时发出的停止请求不会立即停止消费者线程, 因为消费者线程可能在等待条件变量
  consumer.request_stop();
  return 0;
}

std::jthread 为了解决这个问题, 在条件变量中添加了对stop_token的支持. 但是限定了只能使用std::condition_variable_any类, 以下是一个例子:

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

int main() {
  std::queue<int> queue;
  std::mutex mtx;
  std::condition_variable_any cv;

  std::jthread consumer{[&](const std::stop_token& st) {
    std::stop_callback cb(
        st, [&] { std::cout << "Consumer stopped" << std::endl; });
    while (!st.stop_requested()) {
      std::unique_lock<std::mutex> lock(mtx);
      // 使用stop_callback注册的回调函数, 保证在线程停止时, 会唤醒条件变量
      if (!cv.wait(lock, st, [&queue] { return !queue.empty(); })) {
        return;
      }
      while (!queue.empty()) {
        int item = queue.front();
        queue.pop();
        std::cout << "Consumed: " << item << std::endl;
      }
    }
  }};

  std::this_thread::sleep_for(std::chrono::seconds(1));
  consumer.request_stop();
  return 0;
}

输出:

Consumer stopped

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部