C++20 闩与屏障
闩 (latch) 与屏障 (barrier) 是线程协调机制,允许任何数量的线程阻塞直至期待数量的线程到达。闩不能重复使用,而屏障则可以。
std::latch
:单次使用的线程屏障std::barrier
:可复用的线程屏障
它们定义在标头 <latch>
与 <barrier>
。
与信号量类似,屏障也是一种古老而广泛应用的同步机制。许多系统 API 提供了对屏障机制的支持,例如 POSIX 和 Win32。此外,OpenMP 也提供了屏障机制来支持多线程编程。
std::latch
“闩”,这个字其实个人觉得是不常见,“门闩” 是指门背后用来关门的棍子。好了好了,不用在意,在 C++ 中就是先前说的:单次使用的线程屏障。
latch
类维护着一个 std::ptrdiff_t
类型的计数1,且只能减少计数,无法增加计数。在创建对象的时候初始化计数器的值。线程可以阻塞,直到 latch 对象的计数减少到零。由于无法增加计数,这使得 latch
成为一种单次使用的屏障。
std::latch work_start{ 3 };
void work(){
std::cout << "等待其它线程执行\n";
work_start.wait(); // 等待计数为 0
std::cout << "任务开始执行\n";
}
int main(){
std::jthread thread{ work };
std::this_thread::sleep_for(3s);
std::cout << "休眠结束\n";
work_start.count_down(); // 默认值是 1 减少计数 1
work_start.count_down(2); // 传递参数 2 减少计数 2
}
运行结果:
等待其它线程执行
休眠结束
任务开始执行
在这个例子中,通过调用 wait
函数阻塞子线程,直到主线程调用 count_down
函数原子地将计数减至 0
,从而解除阻塞。这个例子清楚地展示了 latch
的使用,其逻辑比信号量更简单。
由于 latch
的计数不可增加,它的使用通常非常简单,可以用来划分任务执行的工作区间。例如:
std::latch latch{ 10 };
void f(int id) {
//todo.. 脑补任务
std::this_thread::sleep_for(1s);
std::cout << std::format("线程 {} 执行完任务,开始等待其它线程执行到此处\n", id);
latch.arrive_and_wait();
std::cout << std::format("线程 {} 彻底退出函数\n", id);
}
int main() {
std::vector<std::jthread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(f,i);
}
}
运行测试。
arrive_and_wait
函数等价于:count_down(n); wait();
。也就是减少计数 + 等待。这意味着
必须等待所有线程执行到 latch.arrive_and_wait();
将 latch 的计数减少至 0
才能继续往下执行。这个示例非常直观地展示了如何使用 latch
来划分任务执行的工作区间。
由于 latch
的功能受限,通常用于简单直接的需求,不少情况很多同步设施都能完成你的需求,在这个时候请考虑使用尽可能功能最少的那一个。
- 使用功能尽可能少的设施有助于开发者阅读代码理解含义。如果使用的是一个功能丰富的设施,可能就无法直接猜测其意图。
std::barrier
上节我们学习了 std::latch
,本节内容也不会对你构成难度。
std::barrier
和 std::latch
最大的不同是,前者可以在阶段完成之后将计数重置为构造时传递的值,而后者只能减少计数。我们用一个非常简单直观的示例为你展示:
std::barrier barrier{ 10,
[n = 1]()mutable noexcept {std::cout << "\t第" << n++ << "轮结束\n"; }
};
void f(int start, int end){
for (int i = start; i <= end; ++i) {
std::osyncstream{ std::cout } << i << ' ';
barrier.arrive_and_wait(); // 减少计数并等待 解除阻塞时就重置计数并调用函数对象
std::this_thread::sleep_for(300ms);
}
}
int main(){
std::vector<std::jthread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(f, i * 10 + 1, (i + 1) * 10);
}
}
可能的运行结果:
1 21 11 31 41 51 61 71 81 91 第1轮结束
12 2 22 32 42 52 62 72 92 82 第2轮结束
13 63 73 33 23 53 83 93 43 3 第3轮结束
14 44 24 34 94 74 64 4 84 54 第4轮结束
5 95 15 45 75 25 55 65 35 85 第5轮结束
6 46 16 26 56 96 86 66 76 36 第6轮结束
47 17 57 97 87 67 77 7 27 37 第7轮结束
38 8 28 78 68 88 98 58 18 48 第8轮结束
9 39 29 69 89 99 59 19 79 49 第9轮结束
30 40 70 10 90 50 60 20 80 100 第10轮结束
注意输出的规律,第一轮每个数字最后一位都是 1
,第二轮每个数字最后一位都是 2
……以此类推,因为我们分配给每个线程的输出任务就是如此,然后利用了屏障一轮一轮地打印。
arrive_and_wait
等价于 wait(arrive());
。原子地将期待计数减少 1,然后在当前阶段的同步点阻塞直至运行当前阶段的阶段完成步骤。
arrive_and_wait()
会在期待计数减少至 0
时调用我们构造 barrier 对象时传入的 lambda 表达式,并解除所有在阶段同步点上阻塞的线程。之后重置期待计数为构造中指定的值。屏障的一个阶段就完成了。
- 并发调用
barrier
除了析构函数外的成员函数不会引起数据竞争。
另外你可能注意到我们使用了 std::osyncstream
,它是 C++20 引入的,此处是确保输出流在多线程环境中同步,免除数据竞争,而且将不以任何方式穿插或截断。
虽然
std::cout
的operator<<
调用是线程安全的,不会被打断,但多个operator<<
的调用在多线程环境中可能会交错,导致输出结果混乱,使用std::osyncstream
就可以解决这个问题。开发者可以尝试去除std::osyncstream
直接使用std::cout
,效果会非常明显。
使用 arrive
或 arrive_and_wait
减少的都是当前屏障计数,我们称作“期待计数”。不管如何减少计数,当完成一个阶段,就重置期待计数为构造中指定的值了。
标准库还提供一个函数 arrive_and_drop
可以改变重置的计数值:它将所有后继阶段的初始期待计数减少一,当前阶段的期待计数也减少一。
不用感到难以理解,我们来解释一下这个概念:
std::barrier barrier{ 4 }; // 初始化计数为 4 完成阶段重置计数也是 4
barrier.arrive_and_wait(); // 当前计数减 1,不影响之后重置计数 4
barrier.arrive_and_drop(); // 当前计数与重置之后的计数均减 1 完成阶段会重置计数为 3
arrive_and_drop
可以用来控制在需要的时候,让一些线程退出同步,如:
std::atomic_int active_threads{ 4 };
std::barrier barrier{ 4,
[n = 1]() mutable noexcept {
std::cout << "\t第" << n++ << "轮结束,活跃线程数: " << active_threads << '\n';
}
};
void f(int thread_id){
for (int i = 0; i < 5; ++i) {
std::osyncstream{ std::cout } << "线程 " << thread_id << " 输出: " << i << '\n';
if (i == 2 && thread_id == 2) { // 假设线程ID为2的线程在输出完2后退出
std::osyncstream{ std::cout } << "线程 " << thread_id << " 完成并退出\n";
--active_threads; // 减少活跃线程数
barrier.arrive_and_drop(); // 减少当前计数 1,并减少重置计数 1
return;
}
barrier.arrive_and_wait(); // 减少计数并等待,解除阻塞时重置计数并调用函数对象
}
}
int main(){
std::vector<std::jthread> threads;
for (int i = 1; i <= 4; ++i) {
threads.emplace_back(f, i);
}
}
运行测试。
初始线程有 4 个,线程 2 在执行了两轮同步之后便直接退出了,调用 arrive_and_drop
函数,下一个阶段的计数会重置为 3
,也就是只有三个活跃线程继续执行。查看输出结果,非常的直观。
这样,arrive_and_drop
的作用就非常明显了,使用也十分的简单。
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » C++20 闩与屏障
发表评论 取消回复