admin管理员组

文章数量:1587995

C++并发编程学习笔记二

  • 4. 同步并发操作
    • 4.1 同步的方案
      • 4.1.1 等待条件达成 | condition variable
    • 4.2 使用期望future等待一次性事件
      • 4.2.1 带返回值的后台任务async
        • async()
        • get()

C++并发编程学习笔记一

4. 同步并发操作

4.1 同步的方案

  1. 持续检查共享数据标志
    缺点:资源持续消耗

  2. 周期性间歇的检查共享数据标志
    缺点:无法正确选择是键盘

  3. condition variable唤醒(优先选择)

condition variable定义:
一个条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行)终止的线程将会向等待着的线程广播“条件达成”的信息

4.1.1 等待条件达成 | condition variable

两套实现方式:

  1. condition_variable
  2. condition_variable_any

差异:两者都需要与一个互斥量一起才能工作(互斥量是为了同步),前者仅限于于mutex一起工作,后者可以和任何满足最低标准的互斥量一起工作,因此加上了_any的后缀。

std::mutex mut;
std::queue<data_chunk> data_queue; 
std::condition_variable data_cond;

// func1
void data_preparation_thread() {
	while(more_data_to_prepare()) {
		data_chunk const data=prepare_data();
		std::lock_guard<std::mutex> lk(mut);	// 1
		data_queue.push(data); // 2
		data_cond.notify_one(); // 3
	}
}

fun1分析:函数作用:只要有更多的数据等待入队,就会先上锁1,2再入队,3notify_one

lock_guard:会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个一锁的互斥量总是会被正确的解锁
看下源码:

unique_lock

  1. 和lock_guard一样通过,在析构函数中进行unlock()的
  2. 比lock_guard更灵活一点,但是效率会第一点,内存占用也会大一点。
  3. 具有自己的成员函数来更加灵活进行锁操作

std::unique_lock

  • unique_lock也可以加std::adopt_lock参数,表示互斥量已经被lock,不需要再重复lock。该互斥两必须已经lock才可以使用该参数。
    std::try_to_lock
  • 可以避免一些不必要的等待,会判断当前mutex能否被lock,如果不能被lock,可以先去执行其他代码
  • 这个和adopt不同,不需要自己提前加锁。举个例子来说就是如果有一个线程被lock,而且执行时间很长,那么另一个线程一般会被阻塞在那里,反而会造成时间的浪费。那么使用了try_to_lock后,如果被锁住了,它不会在那里阻塞等待,它可以先去执行其他没有被锁的代码。
// func2
void data_processing_thread() {
	while(true) {
		std::unique_lock<std::mutex> lk(mut); // 4
		data_cond.wait(lk,[]{return !data_queue.empty();}); // 5
		data_chunk data = data_queue.front();
		data_queue.pop();
		lk.unlock(); // 6
		process(data);
		if(is_last_chunk(data)) break;
	}
}

分析:func2:处理数据的线程

  • wait(): 当满足后面的lambda函数时,将会解决互斥量,并且将该线程阻塞/等待
    在调用wait的过程中,一个条件变量,只有在互斥量被锁定时,会去检查给定条件若干次。当且仅当提供测试条件的函数返回true时,它就会立即返回。当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,这就是所谓的“伪唤醒”(spurious wakeup)。

  • unique_lock:
    当func1调用data_crond.notify_one的时候,func2的线程从阻塞/等待中唤醒,重新换取互斥锁,并且对条件再次检查,再条件满足的情况下,从wait()返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并且重新开始等待。

  • 为什么用unique_lock而不用lock_guard
    等待中的线程必须在等待期间解锁互斥量,并在这之后对互斥量再次上锁,而std::lock_guard

4.2 使用期望future等待一次性事件

期望(future):未来发生一次性事件。

异步操作:无法立即获得结果,因此需要future来协助在未来某个时间点进行获取。
所以:future是用来访问异步操作的结果,也正是题意:使用期望future等待一次性事件。

http://www.cplusplus/reference/future/future/

future的状态:
作用:标记(future_status)来异步操作的结果。
future_status有三种状态:

  • deferred:异步操作还没开始
  • ready:异步操作已经完成
  • timeout:异步操作超时


  • 获取future结果有三种方式:get、wait、wait_for、wait_until
  • 其中get等待异步操作结束并返回结果
  • 阻塞:wait只是等待异步操作完成,没有返回值
  • 异步阻塞:wait_for是超时等待返回结果, 返回值是future_status
    • Waits for the shared state to be ready for up to the time specified by rel_time.
  • 异步阻塞:wait_until也是超时等待返回结果,返回值是future_status
    • Waits for the shared state to be ready, at most until abs_time.

wait_for 和 wait_until注意:

  1. 如果the shraed state is not ready, wait_for/until blocks the calling thread.
  2. If the shared state contains a deferred function (such as future objects returned by async), the function does not block, returning immediately with a value of future_status::deferred. 如果共享状态包含一个deferred函数(例如异步返回的对象),则该函数不阻塞,立即返回future_status的值::deferred.

examples:

// future::wait_for
#include <iostream>       // std::cout
#include <future>         // std::async, std::future
#include <chrono>         // std::chrono::milliseconds

// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
  for (int i=2; i<x; ++i) if (x%i==0) return false;
  return true;
}

int main ()
{
  // call is_prime asynchronously:
  // future<>的类型, 是后面被调用函数的类型
  std::future<bool> fut = std::async (is_prime,700020007); 
  // do something while waiting for function to set future:
  std::cout << "checking, please wait";
  std::chrono::milliseconds span (100);
  while (fut.wait_for(span)==std::future_status::timeout)
    std::cout << '.';

  bool x = fut.get();
  std::cout << "\n700020007 " << (x?"is":"is not") << " prime.\n";

  return 0;
}

两种类型模板实现:
3. unqiue_futures:其实例仅能关联一个事件
4. shared_futures:其实例能关联多个事件

4.2.1 带返回值的后台任务async

  • std::thread 执行的任务不能有返回值,因此需要future来解决
  • 因为 std::thread 并不提供直接接收返回值的机制。这里就需要 std::async 函数模板

http://www.cplusplus/reference/future/async/

async()

用async启动一个新的异步线程运行绑定的任务,返回值为future对象。future对象用来存储async异步任务执行结果。

与 std::thread 对象等待运行方式的不同, std::async 会返回一个 std::future 对象,这个对象持有最终计算出来的结果。

get()

当你需要这个值时,你只需要调用这个对象的get()成员函数;并且直到“期望”状态为就绪的情况下,线程才会阻塞;之后,返回计算结果。

async:

第二个参数fn: a pointer to function, 看到源码时移动构造函数。

第一个参数:

launch policy: 有三种策略

  1. async:启动一个新线程来调用第二个参数的function, 返回一个future值作为function的共享状态;第二个参数是function name,后续是function name的参数;
  2. deferred,直到返回值future被访问(wait / get)才调用function,在被wait/get的时候才会被调用。当这个函数返回了,返回值future是is ready.
  3. async | deferred, 自动选吧。取决于系统或者库的实现。

如果没有第一个参数,就是选第三个 async | deferred, 自动选吧


例子:可以看到fut.get() 是一个阻塞的方法,会等到get结束
看下get定义:
Get value

Returns the value stored in the shared state (or throws its exception) when the shared state is ready.
当这个shared state is ready, get()才会返回

If the shared state is not yet ready (i.e., the provider has not yet set its value or exception), the function blocks the calling thread and waits until it is ready.
如果the shared state 没有ready, 则函数get()会阻塞正在调用的线程。

// async example
#include <iostream>       // std::cout
#include <future>         // std::async, std::future
// a non-optimized way of checking for prime numbers:
bool is_prime (int x) {
  std::cout << "Calculating. Please, wait...\n";
  for (int i=2; i<x; ++i) if (x%i==0) return false;
  return true;
}

int main () {
  // call is_prime(313222313) asynchronously:
  std::future<bool> fut = std::async (is_prime,313222313);

  // 等待the shared state is ready.
  // the function blocks the calling thread and waits until it is ready.
  fut.wait()

  bool ret = fut.get();      // waits for is_prime to return
  if (ret) std::cout << "It is prime!\n";
  else std::cout << "It is not prime.\n";

  return 0;
}

ready是什么意思?
the provider has not yet set its value or exception

Once the shared state is ready, the function unblocks and returns (or throws) releasing its shared state. This makes the future object no longer valid: this member function shall be called once at most for every future shared state.
一旦the shared state is ready了, 只能获取这个共享的future状态一次,如果两次get,则立刻给你报错。

  • 使用 std::future 从异步任务中获取返回值:
#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main() {
	std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
	do_other_stuff();
	std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}
  • std::async 允许向函数传递额外的参数。使用 std::async 向函数传递参数
#include <string>
#include <future>
struct X {
	void foo(int,std::string const&);
	std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello"); // 调用p->foo(42, "hello"),p是指向x的指针
auto f2=std::async(&X::bar,x,"goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本
struct Y {
	double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
auto f4=std::async(std::ref(y),2.718); // 调用y(2.718)
X baz(X&);
std::async(baz,std::ref(x)); // 调用baz(x)
class move_only {
	public:
	move_only();
	move_only(move_only&&)
	move_only(move_only const&) = delete;
	move_only& operator=(move_only&&);
	move_only& operator=(move_only const&) = delete;
	void operator()();
};
auto f5=std::async(move_only()); 
// 调用tmp(),tmp是通过std::move(move_only())构造得到
  • 在函数调用之前,向 std::async 传递一个额外参数。
  • std::launch::async: 表明函数必须在其所在的独立线程上执行
  • std::launch::defered: 函数调用被延迟到wait()或get()函数调用时才执行.
  • std::launch::deferred | std::launch::async 表明实现可以选择这两种方式的一种。最后一个选项是默认的。当函数调用被延迟,它可能不会在运行了。如下所示:
auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行
auto f7=std::async(std::launch::deferred,baz,std::ref(x)); // 在wait()或get()调用时执行
auto f8=std::async(
	std::launch::deferred | std::launch::async,
	baz,std::ref(x)); // 实现选择执行方式
auto f9=std::async(baz,std::ref(x));
f7.wait(); // 调用延迟函数

[1] 《C++11并发编程》

本文标签: 学习笔记ProgrammingParallelingfuture