在并发运行时中,任务是执行特定作业并通常与其他任务并行运行的工作单元。 任务可以分解为组织成任务组的其他更细化的任务。

编写异步代码,并希望在异步操作完成之后进行某种操作时,可使用任务。 例如,可以使用一个任务以异步方式从文件读取,然后使用另一个任务(延续任务,本文档稍后会对此进行说明)在数据可用之后处理数据。 相反,可以使用任务组将并行工作分解成较小的各部分。 例如,假设你有一个将剩余工作划分为两个分区的递归算法。 可以使用任务组并发运行这两个分区,然后等待划分的工作完成。

要将相同例程并行应用于集合的每个元素时,可使用并行算法(如 concurrency::parallel_for),而不是任务或任务组。

要点
  • 通过引用将变量传递到 Lambda 表达式时,必须保证该变量的生存期在任务完成之前一直保持;
  • 编写异步代码时可使用任务(concurrency::task 类)。 任务类使用 Windows 线程池作为其计划程序中,而不是并发运行时;
  • 要将并行工作分解成较小的各部分,然后等待这些较小部分完成时,可使用任务组(concurrency::task_group 类或 concurrency::parallel_invoke 算法);
  • 使用 concurrency::task::then 方法可创建延续。 延续是在另一个任务完成之后异步运行的任务。 可以连接任意数量的延续以形成异步工作链;
  • 基于任务的延续始终计划为在先行任务完成时执行,甚至是在先行任务取消或引发异常时执行;
  • 使用 concurrency::when_all 可创建在任务集的所有成员都完成之后完成的任务。 使用 concurrency::when_any 可创建在任务集的一个成员完成之后完成的任务;
  • 任务和任务组可以参与并行模式库 (PPL) 取消机制;
使用 Lambda 表达式

由于其语法简洁,因此 lambda 表达式是定义由任务和任务组执行的工作的常用方法。 下面是一些使用提示:

  • 因为任务通常在后台线程上运行,所以在 Lambda 表达式中捕获变量时请注意对象生存期。 如果通过值捕获变量,则会在 lambda 体中创建该变量的副本。 通过引用捕获时,不创建副本。 因此,请确保通过引用捕获的任何变量的生存期长于使用它的任务;
  • Lambda 表达式将 lambda 表达式传递给任务时,不捕获通过引用在堆栈上分配的变量;
  • 应明确在 lambda 表达式中捕获的变量,以便可以确定通过值与通过引用捕获的内容。 因此我们建议不要将 [=] 或 [&] 选项用于 lambda 表达式;

一种常用模式是将延续链中的一个任务分配给变量,而另一个任务读取该变量。 无法通过值捕获,因为每个延续任务会保存变量的不同副本。 对于堆栈分配的变量,也无法通过引用捕获,因为变量可能不再有效。

若要解决此问题,请使用智能指针(如 std::shared_ptr)来包装变量,并通过值传递智能指针。 这样,基础对象便可以进行分配和读取,并且生存期会长于使用它的任务。 即使在变量是 Windows 运行时对象的指针或引用计数的句柄 (^) 时,也可使用此技术。 下面是一个基本示例:

// lambda-task-lifetime.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>
#include <string>

using namespace concurrency;
using namespace std;

task<wstring> write_to_string()
{
    // Create a shared pointer to a string that is 
    // assigned to and read by multiple tasks.
    // By using a shared pointer, the string outlives
    // the tasks, which can run in the background after
    // this function exits.
    auto s = make_shared<wstring>(L"Value 1");

    return create_task([s] 
    {
        // Print the current value.
        wcout << L"Current value: " << *s << endl;
        // Assign to a new value.
        *s = L"Value 2";

    }).then([s] 
    {
        // Print the current value.
        wcout << L"Current value: " << *s << endl;
        // Assign to a new value and return the string.
        *s = L"Value 3";
        return *s;
    });
}

int wmain()
{
    // Create a chain of tasks that work with a string.
    auto t = write_to_string();

    // Wait for the tasks to finish and print the result.
    wcout << L"Final value: " << t.get() << endl;
}

/* Output:
    Current value: Value 1
    Current value: Value 2
    Final value: Value 3
*/
task 类

可以使用 concurrency::task 类将任务组合成相关操作集。 此组合模型通过延续来支持。 延续使代码可以在前面(或先行)任务完成时执行。 先行任务的结果会作为输入传递给一个或多个延续任务。 先行任务完成时,在等待它的所有延续任务都计划进行执行。 每个延续任务都会收到先行任务结果的副本。 这些延续任务进而也可能是其他延续的先行任务,从而形成任务链。 延续可帮助创建任务间具有特定依赖关系的任意长度的任务链。 此外,任务还可以在其他任务开始之前或是在其他任务运行期间以协作方式参与取消。 有关此取消模型的详细信息,请参阅PPL 中的取消操作。

task 是模板类。 类型参数 T 是由任务生成的结果的类型。 如果任务不返回值,则此类型可以是 void。 T 不能使用 const 修饰符。

创建任务时,需提供执行任务体的工作函数。 此工作函数采用 lambda 函数、函数指针或函数对象的形式。 若要等待任务完成而不获取结果,请调用 concurrency::task::wait 方法。 task::wait 方法会返回一个 concurrency::task_status 值,该值描述任务是已完成还是已取消。 若要获取任务的结果,请调用 concurrency::task::get 方法。 此方法调用 task::wait 以等待任务完成,因此会在结果可用之前阻止当前线程的执行。

下面的示例演示如何创建任务、等待其结果并显示其值。 本文档中的示例使用 lambda 函数,因为它们提供更简洁的语法。 不过你也可以在使用任务时使用函数指针和函数对象。

// basic-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
    // Create a task.
    task<int> t([]()
    {
        return 42;
    });

    // In this example, you don't necessarily need to call wait() because
    // the call to get() also waits for the result.
    t.wait();

    // Print the result.
    wcout << t.get() << endl;
}

/* Output:
    42
*/

使用 concurrency::create_task 函数时,可以使用 auto 关键字而不是声明类型。 例如,请考虑创建和打印单位矩阵的以下代码:

// create-task.cpp
// compile with: /EHsc
#include <ppltasks.h>
#include <string>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

int wmain()
{
    task<array<array<int, 10>, 10>> create_identity_matrix([]
    {
        array<array<int, 10>, 10> matrix;
        int row = 0;
        for_each(begin(matrix), end(matrix), [&row](array<int, 10>& matrixRow) 
        {
            fill(begin(matrixRow), end(matrixRow), 0);
            matrixRow[row] = 1;
            row++;
        });
        return matrix;
    });

    auto print_matrix = create_identity_matrix.then([](array<array<int, 10>, 10> matrix)
    {
        for_each(begin(matrix), end(matrix), [](array<int, 10>& matrixRow) 
        {
            wstring comma;
            for_each(begin(matrixRow), end(matrixRow), [&comma](int n) 
            {
                wcout << comma << n;
                comma = L", ";
            });
            wcout << endl;
        });
    });

    print_matrix.wait();
}
/* Output:
    1, 0, 0, 0, 0, 0, 0, 0, 0, 0
    0, 1, 0, 0, 0, 0, 0, 0, 0, 0
    0, 0, 1, 0, 0, 0, 0, 0, 0, 0
    0, 0, 0, 1, 0, 0, 0, 0, 0, 0
    0, 0, 0, 0, 1, 0, 0, 0, 0, 0
    0, 0, 0, 0, 0, 1, 0, 0, 0, 0
    0, 0, 0, 0, 0, 0, 1, 0, 0, 0
    0, 0, 0, 0, 0, 0, 0, 1, 0, 0
    0, 0, 0, 0, 0, 0, 0, 0, 1, 0
    0, 0, 0, 0, 0, 0, 0, 0, 0, 1
*/

可以使用 create_task 函数创建等效操作。

auto create_identity_matrix = create_task([]
{
    array<array<int, 10>, 10> matrix;
    int row = 0;
    for_each(begin(matrix), end(matrix), [&row](array<int, 10>& matrixRow) 
    {
        fill(begin(matrixRow), end(matrixRow), 0);
        matrixRow[row] = 1;
        row++;
    });
    return matrix;
});

如果在任务执行期间引发异常,则运行时会在对 task::get 或 task::wait 或是基于任务的延续的后续调用中封送该异常。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部