Linux线程同步与互斥(二)/生产消费者模型

编程入门 行业动态 更新时间:2024-10-09 19:18:44

Linux<a href=https://www.elefans.com/category/jswz/34/1771240.html style=线程同步与互斥(二)/生产消费者模型"/>

Linux线程同步与互斥(二)/生产消费者模型

⭐前言:本文会先后讲解生产消费者模型、条件变量和基于阻塞队列的生产消费者模型。

1.生产消费者模型

什么是生产消费者模型?

认识生产消费者模型

使用学生(消费者),超市,供货商(生产者)来举一个例子。

学生是典型的消费者,供货商是典型的生产者。假设学生有泡面、火腿肠、玩具等等的需求,而供货商会生产尽可能覆盖学生需求的商品,但是一般并不会直接卖给学生,而是供货给超市,从而在超市里做买卖。

如果是春节期间,或者是什么特殊时期,供货商暂停生产商品,但由于超市已经进了很大一批货,生产者停止生产压根不会影响到消费者消费。这里,就是生产与消费过程的互不干扰,称之为解耦。而超市作为大量进货的一方,一般不会进很少货,而是进很多,达到满足消费需求,因此可以看作是临时保存商品的地方,我们对应的是计算机中的缓冲区

接下来举一个反例,即强耦合关系的生产消费者来加深理解->在我们平时写的代码中,比如实现一个加法函数Add和调用Add函数的main函数。在调用Add函数的时候,main函数是在等待阶段的,并且Add函数是会直接影响到main函数的。

因此,从上面例子来正确认识一下生产消费者模型:

超市(缓冲区)作为消费者消费的地方,生产者放资源的地方,它必须要让消费者和生产者都看到,即它是一份共享资源!这部分共享资源会涉及到多线程访问,那就必须被保护起来。

生产者和消费者是线程,顾名思义,作为生产者的线程用于向缓冲区存入数据,消费者线程向缓冲区拿数据。

因此,生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者消费者关系

生产者和消费者,都是对应着一个或多个线程。

生产者和生产者之间的关系是互斥关系,就好像金华火腿和双汇火腿不能同时摆放在一个货架上,否则会很乱,消费者就不好消费了,一定是先摆放好一种火腿,再去摆放好另外一种火腿。

消费者和消费者之间的关系也是互斥关系。很简单的一个例子:黄牛抢票。

消费者和生产者的关系也是有互斥关系。比如超市里面有一个展架,上面刚放了生产者提供的火腿,但是还没贴价格标签,但是消费者已经来想要消费了,很明显价格还没贴是不能消费先的,因此需要互斥关系,并且这能保证资源是安全的。

消费者和生产者的关系还有同步关系。超市会根据火腿的售卖量来决定入货的数量,即向消费者卖了多少火腿,就向生产者要多少火腿,然后火腿有货的时候,就会通知消费者来消费,缺火腿的时候,就会通知生产者进货,使其有顺序性。从而达到了消费者和生产者的同步与互斥关系。

总结:“321”原则

3种关系:①生产者和生产者的互斥关系。②消费者和消费者的互斥关系。③生产者和消费者的互斥(互斥是为了保证共享资源的安全性)、同步关系。

2种角色:生产者线程和消费者线程。

1个交易场所:一段特定结构的缓冲区。

生产消费者模型的优点

解耦    支持并发     支持忙闲不均

如何维护生产消费者模型中生产者和消费者的同步关系?

现在我们有了消费者线程,也有了生产者线程,并且它们都是线程安全的,加了互斥锁。

生产者线程在生产数据加入到缓冲区中,是对消费者线程互斥的,也就是说生产者线程在生产的时候,会对缓冲区加锁,只有将生产后的数据加入到缓冲区中,并且解锁之后,理论上消费者才可以去消费。但是生产者的优先级高,即使缓冲区中已经满了,但是生产者依旧不断地加锁----到缓冲区中判断是否满了,如果没满,则放入数据,如果满了则解锁离开。离开之后,又有生产者线程来了......这样就导致消费者线程永远进不来。这就只保证了生产者和消费者之间的互斥,保证了共享资源的安全性,但是没有维护好两者的同步关系!

因此我们需要引入条件变量来维护同步关系!

2.条件变量

什么是条件变量

条件变量就是一种变量,它包含了条件变量的状态和队列的指针,它可以链接不满足条件,需要等待的线程。待线程满足条件,就可以将其唤醒拿出来,放到CPU上等待队列。

这里我们可以看到,所有的线程都必须要看到这个条件变量,因此条件变量需要定义为全局的。

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,不能让它申请锁了,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

用条件变量的逻辑为:

加锁----->判断生产和消费条件是否满足----->解锁。其中判断条件也是访问临界资源的一种,需要保护起来。当条件满足,则做相应的事,如果不满足,则解锁之后,不再申请锁,并且将自己阻塞挂起等待。

条件变量函数

条件变量的类型:

pthread_cond_t

①初始化

int pthread_cond_init(pthread_cond_t* restrict cond, const pthread_condattr_t* restrict attr);
参数:cond:要初始化的条件变量attr:NULL
返回值:成功返回0,失败返回错误码。

②销毁

int pthread_cond_destroy(pthread_cond_t *cond)

③等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:cond:要在这个条件变量上等待mutex:互斥量,后面详细解释

④等待唤醒

//唤醒一批线程
int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒一个线程
int pthread_cond_signal(pthread_cond_t *cond);

下面通过条件变量的简单案例,来看看条件变量是如何工作的。

通过创建两个线程,让两个线程交替执行去抢票,通过条件变量和互斥锁,让两个线程有顺序地执行。两个线程执行的时候,先会在条件变量中排队,等待主线程的唤醒,依次执行。

#include<iostream>
#include<string>
#include<pthread.h>
#include<unistd.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量
int tickets = 1000;
void* start_routine(void *args)
{std::string name = static_cast<const char*>(args);while(true){pthread_mutex_lock(&mutex);//加锁//每一个线程进来后先等等pthread_cond_wait(&cond,&mutex);//if(tickets)std::cout<<name<<"-> "<<tickets<<std::endl;tickets--;pthread_mutex_unlock(&mutex);//解锁}
}int main()
{//通过条件变量控制线程的执行//创建两个线程,让两个线程交替执行pthread_t t1,t2;pthread_create(&t1,nullptr,start_routine,(void*)"thread 1");pthread_create(&t2,nullptr,start_routine,(void*)"thread 2");while(true){//主线程,控制节奏,在其它线程在条件变量的等待的时候,一次唤醒一个线程进行运行sleep(2);//每隔2秒唤醒一下pthread_cond_signal(&cond);//预期每一个线程在执行的时候,一定会有非常强的顺序性std::cout<<"main thread wakeup one thread..."<<std::endl;}//线程等待pthread_join(t1,nullptr);pthread_join(t2,nullptr);return 0;
}

可以看到结果,顺序性很强,都是1->2->1->2这样的执行顺序。

我们再创建多个线程,跟上面一样,让多个线程在执行的时候,先在条件变量中排队,等待主线程的唤醒,然后依次执行。

这里的唤醒是使用pthread_cond_signal,每次唤醒一个线程。

#include<iostream>
#include<string>
#include<pthread.h>
#include<unistd.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量
int tickets = 1000;
void* start_routine(void *args)
{std::string name = static_cast<const char*>(args);while(true){pthread_mutex_lock(&mutex);//加锁//每一个线程进来后先等等pthread_cond_wait(&cond,&mutex);std::cout<<name<<"-> "<<tickets<<std::endl;tickets--;pthread_mutex_unlock(&mutex);//解锁}
}int main()
{//通过条件变量控制线程的执行//创建多个线程,让多个线程交替执行pthread_t t[5];for(int i = 0;i<5;i++){char *name = new char[64];snprintf(name,64,"thread %d",i+1);pthread_create(t+i,nullptr,start_routine,name);}while(true){//主线程,控制节奏,在其它线程在条件变量的等待的时候,一次唤醒一个线程进行运行sleep(1);//每隔1秒唤醒一下pthread_cond_signal(&cond);//预期每一个线程在执行的时候,一定会有非常强的顺序性std::cout<<"main thread wakeup one thread..."<<std::endl;}for(int i = 0;i < 5;++i){pthread_join(t[i],nullptr);}return 0;
}

 使用pthread_cond_broadcast,一次唤醒一批线程。

        //pthread_cond_signal(&cond);pthread_cond_broadcast(&cond);

3.基于阻塞队列的生产消费者模型

终于,我们知道了什么叫做生产消费者模型,什么是条件变量,并且简单了解了条件变量如何去使用了。

现在,我们左手一个生产消费者模型,右手一个条件变量和互斥量,是时候徒手搓氢弹....呃....基于阻塞队列的生产消费者模型了!

什么是基于阻塞队列的生产消费者模型?

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

C++queue模拟阻塞队列的生产消费者模型

单生产者单消费者示例代码:

以单生产者,单消费者,来进行学习。

头文件BlockQueue.hpp:

#pragma once#include <iostream>
#include<queue>
#include<pthread.h>
using namespace std;
const int gmaxcap = 5;//设置默认缺省值为5,方便进行测试
template<class T>
class BlockQueue
{
public:BlockQueue(const int &_maxcap = gmaxcap):_maxcap(gmaxcap){pthread_mutex_init(&_mutex,nullptr);//初始化锁pthread_cond_init(&_pcond,nullptr);//初始化条件变量pthread_cond_init(&_ccond,nullptr);}//放数据void push(const T &in)//输入型参数,一般用const 和引用&修饰{//线程进来先加锁,生产者pthread_mutex_lock(&_mutex);//在放入数据的前,需要判断阻塞队列是否满了if(is_full()){//满了不能生产,那就去对应的条件变量中等待pthread_cond_wait(&_pcond,&_mutex);}//没满,就放入数据_q.push(in);//此时我们可以绝对保证,阻塞队列里面一定有数据,至少有一个//此时可以让消费者来消费,即唤醒消费者pthread_cond_signal(&_ccond);//这里可以加一定的策略,待队列的元素超过三分之一再唤醒消费者//最后解锁pthread_mutex_unlock(&_mutex);}//拿数据void pop(T *out)//输出型参数,一般用*{pthread_mutex_lock(&_mutex);//消费者和生产者用的是同一把锁,因为必须让生产者和消费者互斥//判断队列是否空的,如果空,那就等待if(is_empty()){pthread_cond_wait(&_ccond,&_mutex);}//走到这里,我们能保证一定不为空//拿数据*out =_q.front();_q.pop();//绝对能保证,阻塞队列里面至少有一个空的位置//此时可以将生产者唤醒,去生产pthread_cond_signal(&_pcond);//这里也可以有一定的策略pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}private:
//判断阻队列是否为空bool is_empty(){return _q.empty();}//判断阻队列是否满了bool is_full(){return _q.size()==_maxcap;}
private:std::queue<T> _q;int _maxcap;//阻塞队列中最多能有多少个元素//_q这个队列本身不是线程安全的,因此我们保护起来pthread_mutex_t _mutex;//当队列满了之后,生产者进入休眠,队列空,消费者进入休眠,因此定义条件变量pthread_cond_t _pcond;//生产者对应的条件变量pthread_cond_t _ccond;//消费者对应的条件变量
};

①生产速度比消费速度慢,那么会出现生产一个,消费一个这样的情况,因为会阻塞嘛。

#include <ctime>
#include <sys/types.h>
#include <unistd.h>
using namespace std;
#include "BlockQueue.hpp"
void* consumer(void *_bq)
{BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(_bq);while(true){//消费活动int data;bq->pop(&data);std::cout<<"消费数据: "<<data<<std::endl;}return nullptr;}void* productor(void *_bq)
{BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(_bq);while(true){//生产活动int data = rand()%10 +1;//1到10的数据bq->push(data);std::cout<<"生产数据: "<<data<<std::endl;sleep(1);//让生产速度比消费速度慢一点}return nullptr;}
int main()
{srand((unsigned long)time(nullptr)^getpid());//随机生成数据据BlockQueue<int>* bq = new BlockQueue<int>();//阻塞队列pthread_t c,p;//消费者和生产者线程pthread_create(&c,nullptr,consumer,bq);//消费者 pthread_create(&p,nullptr,productor,bq);//生产者pthread_join(c,nullptr);pthread_join(p,nullptr);return 0; 
}

②生产速度比消费速度快,那么一开始会生产出好几个,然后会生产一下,消费一下。接着我们可以通过结果看出,消费的都是先生产出来的。

通过上面的代码和结果,我们很明显地感受到了生产者和消费者之间的协同,也就是同步了!

接下来我们完善这一份代码和一些细节的说明:

⭐细节1:我们在放入数据或拿数据的时候,是添加了互斥锁的!也就是说,线程在拿到锁后,进入等待的时候,是拿着锁一起等待的!那么这就意味着,锁被拿走了,谁也无法进入这个临界区了!

可事实上并不是这样,因为pthread_cond_wait这个函数的第二个参数,必须是我们正在使用的互斥锁,该函数会以原子性的方式将锁释放,并且将自己(线程)挂起,在被唤醒返回的时候,会自动重新获取我们传入的锁。

⭐细节2:如果生产者有10个,此时的队列是满的,只有一个消费者。那么消费者在消费后,阻塞队列就有一个位置空了出来,如果我们让一批生产者唤,而空间只有1个,那么在放入数据的时候,就有很大的可能会放入很多个数据!因此,我们将放入数据的操作中的判空操作,从if语句改为while循环,生产者醒来之后,再判断一下队列是否为空或者为满!同样的道理,消费者拿数据也一样,将if改为while循环。

⭐细节3:pthread_cond_signal这个函数,可以放在临界区内部,也可以放在外部。

⭐细节4:上面的代码中,只有打印1,2,3,4这样的数据,实在有点捞,我们可以改一下代码,让生产者给消费者派发任务!

派发任务Task.hpp:

#pragma once
#include <iostream>
#include <functional>
#include <cstdio>
using namespace std;class Task
{using func_t = std::function<int(int,int,char)>;// typedef std::function<int(int,int)> func_t;
public:Task(){}Task(int x, int y,char op, func_t func):_x(x), _y(y),_op(op), _callback(func){}string operator()(){int result = _callback(_x, _y,_op);char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);return buffer;}string toTaskString(){char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);return buffer;}
private:int _x;int _y;char _op;func_t _callback;
};

修改后的代码: 

头文件BlockQueue.hpp:

#include <iostream>
#include<queue>
#include<pthread.h>
using namespace std;
const int gmaxcap = 5;//设置默认缺省值为5,方便进行测试
template<class T>
class BlockQueue
{
public:BlockQueue(const int &_maxcap = gmaxcap):_maxcap(gmaxcap){pthread_mutex_init(&_mutex,nullptr);//初始化锁pthread_cond_init(&_pcond,nullptr);//初始化条件变量pthread_cond_init(&_ccond,nullptr);}//放数据void push(const T &in)//输入型参数,一般用const 和引用&修饰{//线程进来先加锁,生产者pthread_mutex_lock(&_mutex);//细节2:充当条件判断的语法必须是while,不能用if//在放入数据的前,需要判断阻塞队列是否满了while(is_full()){//细节1:pthread_cond_wait这个函数的第二个参数,必须是我们正在使用的互斥锁//该函数会以原子性的方式将锁释放,并且将自己挂起//该函数在被唤醒返回的时候,会自动重新获取我们传入的锁//满了不能生产,那就去对应的条件变量中等待pthread_cond_wait(&_pcond,&_mutex);}//没满,就放入数据_q.push(in);//此时我们可以绝对保证,阻塞队列里面一定有数据,至少有一个//此时可以让消费者来消费,即唤醒消费者pthread_cond_signal(&_ccond);//这里可以加一定的策略,待队列的元素超过三分之一再唤醒消费者//最后解锁pthread_mutex_unlock(&_mutex);}//拿数据void pop(T *out)//输出型参数,一般用*{pthread_mutex_lock(&_mutex);//消费者和生产者用的是同一把锁,因为必须让生产者和消费者互斥//判断队列是否空的,如果空,那就等待while(is_empty()){pthread_cond_wait(&_ccond,&_mutex);}//走到这里,我们能保证一定不为空//拿数据*out =_q.front();_q.pop();//绝对能保证,阻塞队列里面至少有一个空的位置//此时可以将生产者唤醒,去生产//细节3:pthread_cond_signal:这个函数,可以放在临界区内部,也可以放在外部pthread_cond_signal(&_pcond);//这里也可以有一定的策略pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}
private:
//判断阻队列是否为空bool is_empty(){return _q.empty();}//判断阻队列是否满了bool is_full(){return _q.size()==_maxcap;}
private:std::queue<T> _q;int _maxcap;//阻塞队列中最多能有多少个元素//_q这个队列本身不是线程安全的,因此我们保护起来pthread_mutex_t _mutex;//当队列满了之后,生产者进入休眠,队列空,消费者进入休眠,因此定义条件变量pthread_cond_t _pcond;//生产者对应的条件变量pthread_cond_t _ccond;//消费者对应的条件变量
};

#include <ctime>
#include <sys/types.h>
#include <unistd.h>
using namespace std;
#include "BlockQueue.hpp"
#include "Task.hpp"const string oper="+-*/%";int mymath(int x,int y,char op)
{int result = 0;switch(op){case '+':result = x+y;break;case '-':result = x-y;break;case '*':result = x*y;break;case '/':{if(y==0){std::cerr<<"div zero error!"<<std::endl;result = -1;}elseresult = x/y;}break;case '%':{if(y==0){std::cerr<<"mod zero error!"<<std::endl;result = -1;}elseresult = x%y;}break;default:break;}return result;
}
void* consumer(void *_bq)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(_bq);while(true){//消费活动Task t;bq->pop(&t);std::cout<<"消费任务: "<<t()<<std::endl;//sleep(1);}return nullptr;
}void* productor(void *_bq)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(_bq);while(true){//生产活动int x = rand()%10 +1;//1到10的数据int y = rand()%5 +1;//1到5int operCode = rand()%oper.size();//可以模上字符串的长度,拿到下标范围Task t(x,y,oper[operCode],mymath);bq->push(t);std::cout<<"生产任务: "<<t.toTaskString() <<std::endl;sleep(1);}return nullptr;}
int main()
{srand((unsigned long)time(nullptr)^getpid());//随机生成数据据BlockQueue<Task>* bq = new BlockQueue<Task>();//阻塞队列pthread_t c,p;//消费者和生产者线程pthread_create(&c,nullptr,consumer,bq);//消费者 pthread_create(&p,nullptr,productor,bq);//生产者pthread_join(c,nullptr);pthread_join(p,nullptr);return 0; 
}

单消费者单生产者加一个存储者(消费者)

现在,我们改一下需求,我们让一个生产者派发任务,一个消费者处理任务,一个消费者来记录任务结果,并且将结果记录在文件中,实现多线程协同。其中,处理任务的消费者线程,既是消费者,也是生产者。

代码如下:代码解释均在注释中:

Bolckqueue.hpp的代码依然如上,main函数所在的代码如下:


#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;//C:计算
//S:存储
template<class C,class S>
class BlockQueues//阻塞队列集
{
public:BlockQueue<C> *c_bq;//计算队列BlockQueue<S> *s_bq;//存储队列};void* productor(void *_bq)
{BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(_bq))->c_bq;while(true){//生产活动int x = rand()%10 +1;//1到10的数据int y = rand()%5 +1;//1到5int operCode = rand()%oper.size();//可以模上字符串的长度,拿到下标范围//生产者将任务创建出来CalTask t(x,y,oper[operCode],mymath);//任务创建出来后放入计算阻塞队列当中bq->push(t);std::cout<<"pro thread 生产任务: "<<t.toTaskString() <<std::endl;sleep(1);}return nullptr;
}void* consumer(void *_bq)
{BlockQueue<CalTask> *bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(_bq))->c_bq;//即拿到了处理数据的阻塞队列BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(_bq))->s_bq;//也拿到了存储数据的阻塞队列while(true){//消费活动//这个消费者先计算任务//先创建计算任务的对象CalTask t;//然后从队列中删除这个任务(拿出这个任务)bq->pop(&t);//通过对象的仿函数获取对应的数据string result = t();//获取这个数据std::cout<<"cal thread 完成计算任务: "<<result<<"...done"<<std::endl;//计算后,将结果存储//存储任务//创建存储对象,然后将结果和对应的函数传入SaveTask save(result,Save);//放入到存储阻塞队列当中。save_bq->push(save);std::cout<<"cal thread 推送需要保存的任务完成....: "<<t()<<std::endl;//sleep(1);}return nullptr;
}void *saver(void *_bq)
{//存储队列BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(_bq))->s_bq;//也拿到了存储数据的阻塞队列while(true){//获取存储任务SaveTask t;save_bq->pop(&t);//通过回调方法进行数据存储,存储到文件当中t();cout<<"saver thread 保存任务完成..."<<endl;}return nullptr;
}
int main()
{srand((unsigned long)time(nullptr)^getpid());//随机生成数据据BlockQueues<CalTask,SaveTask> bqs;//阻塞队列集合的对象//使用阻塞队列集合的对象构建计算队列和存储队列的对象bqs.c_bq = new BlockQueue<CalTask>();//计算阻塞队列bqs.s_bq = new BlockQueue<SaveTask>();//存储阻塞队列pthread_t c,p,s;//消费者和生产者线程和保存者(消费者)pthread_create(&c,nullptr,consumer,&bqs);//消费者 pthread_create(&p,nullptr,productor,&bqs);//生产者pthread_create(&s,nullptr,saver,&bqs);//保存者pthread_join(c,nullptr);pthread_join(p,nullptr);pthread_join(s,nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0; 
}

派发任务的代码如下:

#pragma once
#include <iostream>
#include <functional>
#include <cstdio>
using namespace std;class CalTask//表示计算任务
{using func_t = std::function<int(int,int,char)>;// typedef std::function<int(int,int)> func_t;
public:CalTask(){}CalTask(int x, int y,char op, func_t func):_x(x), _y(y),_op(op), _callback(func){}string operator()(){int result = _callback(_x, _y,_op);char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);return buffer;}string toTaskString(){char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);return buffer;}
private:int _x;int _y;char _op;func_t _callback;
};
const string oper="+-*/%";
int mymath(int x,int y,char op)
{int result = 0;switch(op){case '+':result = x+y;break;case '-':result = x-y;break;case '*':result = x*y;break;case '/':{if(y==0){std::cerr<<"div zero error!"<<std::endl;result = -1;}elseresult = x/y;}break;case '%':{if(y==0){std::cerr<<"mod zero error!"<<std::endl;result = -1;}elseresult = x%y;}break;default:break;}return result;
}
class SaveTask//保存任务 
{using func_t = function<void(const string&)>;//typedef function<void(const string &)> func_t;
public:SaveTask(){}SaveTask(const string &message,func_t func):_message(message),_func(func){}void operator()(){_func(_message);}
private:string _message;func_t _func;};void Save(const string &message)
{const string target = "./log.txt";FILE *fp = fopen(target.c_str(),"a+");if(!fp){cerr<<"fopen error"<<endl;return;}fputs(message.c_str(),fp);fputs("\n",fp);fclose(fp);
}

结果:

 

多消费者多生产者

通过创建多个线程即可:

int main()
{srand((unsigned long)time(nullptr)^getpid());//随机生成数据据BlockQueues<CalTask,SaveTask> bqs;//阻塞队列集合的对象//使用阻塞队列集合的对象构建计算队列和存储队列的对象bqs.c_bq = new BlockQueue<CalTask>();//计算阻塞队列bqs.s_bq = new BlockQueue<SaveTask>();//存储阻塞队列pthread_t c[2],p[3],s;//消费者和生产者线程和保存者(消费者)pthread_create(p,nullptr,productor,&bqs);//生产者pthread_create(p+1,nullptr,productor,&bqs);//生产者pthread_create(p+2,nullptr,productor,&bqs);//生产者pthread_create(c,nullptr,consumer,&bqs);//消费者 pthread_create(c+1,nullptr,consumer,&bqs);//消费者 pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);pthread_join(p[2],nullptr);pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0; 
}

多生产者和多消费者之所以可以存在,那是因为锁的存在,使得多个生产者和多个消费者在一条阻塞队列中串行运行,只有一个线程可以访问。

生产消费者模型为什么高效

现在,我们来反思一个很重要的问题:生产消费者模型为什么高效

1.生产和消费的完成过程

对于生产者而言,完整的生产任务过程是从数据库、外设等等拿到用户的数据,然后放入到共享资源(阻塞队列)当中。

对于消费者而言,完成的消费任务是从阻塞队列中拿到数据后,到处理完数据的过程。

2.时间花费

对于生产者来说,它获取和构建任务是需要花时间的。对于消费者来说,它把任务从队列中拿出来并且处理任务也是需要花时间的。

3.消费者或生产者并发执行任务

每一个生产者或消费者在拿到数据,并且执行任务的同时,此时另外一个生产者也会从某处拿到下一份的用户数据进行构建等操作,另外一个消费者也会从队列中拿到下一份数据进行处理...依次类推,生产者和消费者并发性地执行任务!

因此,生产消费者模型高效的重要原因是多个生产者或多个消费者能够并发性地执行!

创建多线程生产和消费的意义便是可以让生产者在生产的时候、消费者在消费的时候并发执行!

更多推荐

Linux线程同步与互斥(二)/生产消费者模型

本文发布于:2024-03-10 09:58:23,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1727654.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:线程   模型   消费者   互斥   Linux

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!