Linux——生产者消费者模型

编程入门 行业动态 更新时间:2024-10-17 23:21:19

Linux——<a href=https://www.elefans.com/category/jswz/34/1768323.html style=生产者消费者模型"/>

Linux——生产者消费者模型

目录

一.为何要使用生产者消费者模型

 二.生产者消费者模型优点

 三.基于BlockingQueue的生产者消费者模型

1.BlockingQueue——阻塞队列

2.实现代码

 四.POSIX信号量

五.基于环形队列的生产消费模型


一.为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

 

 二.生产者消费者模型优点

  • 解耦:生产者和消费者不直接解除,无需关心对方的情况,仅仅自己与缓冲区解除。
  • 支持并发:并发的体现并不在于多个消费者同时从缓冲区中拿数据,也不是多个生产者同时从缓冲区放数据,而是消费者在处理拿到的数据的时候,生产者可以继续向缓冲区放数据。
  • 支持忙闲不均 :当生产者生产过快的时候,可以让生产者慢下来,当消费者消费过快的时候,可以让消费者慢下来。

 三.基于BlockingQueue的生产者消费者模型

 1.BlockingQueue——阻塞队列

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

2.实现代码

#include <iostream>
#include <string>
#include <queue>
#include <ctime>
#include <unistd.h>
#include <pthread.h>using namespace std;template <class T>
class BlockQueue
{
public:BlockQueue(size_t cap): _cap(cap){// 初始化条件变量pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}void push(T date){// 将任务push进去队列,多线程加锁,每一只能一个线程push任务pthread_mutex_lock(&_mutex);while (_q.size() == _cap) // 如果队列已经满了,生产者要被阻塞{pthread_cond_wait(&_p_cond, &_mutex);}_q.push(date);// 当push任务成功的时候,需要将唤醒消费者来处理数据pthread_cond_signal(&_c_cond);pthread_mutex_unlock(&_mutex);}T pop(){// 将任务从队列中拿出来,多线程加锁,每一只能一个线程拿任务pthread_mutex_lock(&_mutex);// 如果队列是空的就将消费者阻塞while (isempty()){pthread_cond_wait(&_c_cond, &_mutex);}T tmp = _q.front();_q.pop();// 成功拿到数据以后,唤醒生产者继续生产任务pthread_cond_signal(&_p_cond);pthread_mutex_unlock(&_mutex);return tmp;}~BlockQueue(){pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}private:bool isempty(){return _q.empty();}bool isfull(){return _q.size() == _cap;}private:queue<T> _q; // 队列size_t _cap; // 容量pthread_cond_t _c_cond;                             // 消费者条件变量pthread_cond_t _p_cond;                             // 生产者条件变量pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
};

cp模型:

#include "BlockQueue.hpp"using namespace std;// 构建任务
struct Task
{Task(int a, int b, char op): _a(a), _b(b), _op(op){}char _op;      // 运算符int _a;        // 运算数1int _b;        // 运算数2int ret;       // 结果int _exitcode; // 退出码
};void *push_task(void *args)
{BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);while (1){char op_arr[4] = {'+', '-', '*', '/'};int a = rand() % 10;int b = rand() % 10;char op = op_arr[(a * b) % 4];// 构建任务结构体Task tk(a, b, op);// push任务pBQ->push(tk);printf("%d %c %d =?\n", a, op, b);sleep(1);}return NULL;
}void *get_task(void *args)
{BlockQueue<Task> *pBQ = static_cast<BlockQueue<Task> *>(args);while (1){// 获取任务并处理Task tk = pBQ->pop();switch (tk._op){case '+':tk.ret = tk._a + tk._b;break;case '-':tk.ret = tk._a - tk._b;break;case '*':tk.ret = tk._a * tk._b;break;case '/':if (tk._b == 0){exit(-1);}tk.ret = tk._a / tk._b;break;default:break;}printf("%d %c %d = %d\n", tk._a, tk._op, tk._b, tk.ret);sleep(1);}return NULL;
}int main()
{BlockQueue<Task> BQ(5);pthread_t tid_c[4];pthread_t tid_p[4];srand(time(nullptr));// pushpthread_create(&tid_c[0], NULL, push_task, &BQ);pthread_create(&tid_c[1], NULL, push_task, &BQ);pthread_create(&tid_c[2], NULL, push_task, &BQ);pthread_create(&tid_c[3], NULL, push_task, &BQ);// getpthread_create(&tid_p[0], NULL, get_task, &BQ);pthread_create(&tid_p[1], NULL, get_task, &BQ);pthread_create(&tid_p[2], NULL, get_task, &BQ);pthread_create(&tid_p[3], NULL, get_task, &BQ);pthread_join(tid_c[0], NULL);pthread_join(tid_c[1], NULL);pthread_join(tid_c[2], NULL);pthread_join(tid_c[3], NULL);pthread_join(tid_p[0], NULL);pthread_join(tid_p[1], NULL);pthread_join(tid_p[2], NULL);pthread_join(tid_p[3], NULL);return 0;
}

测试结果:

 四.POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

定义信号量:

sem_t sem;

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  1. pshared:0表示线程间共享,非零表示进程间共享。
  2. value:信号量初始值。

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

功能:等待信号量,会将信号量的值减1。

int sem_wait(sem_t *sem); //P()

发布信号量:

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

int sem_post(sem_t *sem);//V()

说明:

  • 信号量本身就是一个计数器,用来描述可用资源的数目。
  • 信号量机制就像是我们看电影买票一样,是对资源的预定机制。
  • 只有申请到信号量才能对共享资源访问。
  • 只要我们申请信号量成功了,将来我们一定可以访问临界资源,就像看电影,我们只要买到了电影票,不管我们去不去电影院,电影院里一定有我们的位置。

五.基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性。

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

代码:

RingQueue.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>
#include <pthread.h>
#include <vector>
#include <unistd.h>
#include <semaphore.h>
#include "mutex.hpp"
#include "Task.hpp"
using namespace std;const size_t size = 5;template <class T>
class RingQueue
{void P(sem_t &sem) // 申请信号量{sem_wait(&sem);}void V(sem_t &sem) // 释放信号量{sem_post(&sem);}public:RingQueue(int cap = size): _cap(cap), _index_space(0), _index_date(0){// 初始化信号量sem_init(&_sem_date, 0, 0);    // 数据信号量初始化为0sem_init(&_sem_space, 0, cap); // 空间信号量初始化为容量大小// 初始化锁pthread_mutex_init(&_mutex, nullptr);_rq.resize(_cap);}void push(const T date){// 申请空间信号量P(_sem_space);{MutexGuard lock(&_mutex);_rq[_index_date++] = date;_index_date %= _cap;}// 释放数据信号量V(_sem_date);}T pop(){// 申请数据信号量P(_sem_date);T tmp;{MutexGuard lock(&_mutex);tmp = _rq[_index_space++];_index_space %= _cap;}// 释放空间信号量V(_sem_space);return tmp;}~RingQueue(){// 释放信号量和互斥锁sem_destroy(&_sem_date);sem_destroy(&_sem_space);pthread_mutex_destroy(&_mutex);}private:vector<T> _rq;size_t _cap; // 容量sem_t _sem_space; // 记录环形队列的空间信号量sem_t _sem_date;  // 记录环形队列的数据信号量size_t _index_space; // 生产者的生产位置size_t _index_date;  // 消费者的消费位置pthread_mutex_t _mutex; // 容量
};

mutex.hpp:

class Mutex
{
public:Mutex(pthread_mutex_t *mutex): _mutex(mutex){}void lock(){pthread_mutex_lock(_mutex);}void unlock(){pthread_mutex_unlock(_mutex);}~Mutex(){}private:pthread_mutex_t *_mutex;
};class MutexGuard
{
public:MutexGuard(pthread_mutex_t *mutex): _mutex(mutex){_mutex.lock();}~MutexGuard(){_mutex.unlock();}private:Mutex _mutex;
};

Task.hpp:

#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdlib>struct Task
{Task(int a = 1, int b = 1, char op = '+'): _a(a), _b(b), _op(op){}void run(){switch (_op){case '+':_ret = _a + _b;break;case '-':_ret = _a - _b;break;case '*':_ret = _a * _b;break;case '/':if (_b == 0){_exitcode = -1;exit(1);}_ret = _a / _b;break;default:break;}}void showtask(){printf("producer:%d %c %d = ?\n", _a, _op, _b);}void showresult(){printf("consumer:%d %c %d = %d(exitcode:%d)\n", _a, _op, _b, _ret, _exitcode);}~Task() {}private:int _a;int _b;char _op;int _ret;int _exitcode = 0;
};

pthread:

#include "RingQueue.hpp"void *run_p(void *args)
{char ops[4] = {'+', '-', '*', '/'};RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);while (1){int a = rand() % 100;int b = rand() % 100;int op = ops[(a * b) % 4];Task tk(a, b, op);RQ->push(tk);tk.showtask();sleep(1);}
}
void *run_c(void *args)
{RingQueue<Task> *RQ = static_cast<RingQueue<Task> *>(args);while (1){Task tk = RQ->pop();tk.run();tk.showresult();sleep(1);}
}int main()
{RingQueue<Task> *RQ = new RingQueue<Task>(5);srand(time(0));pthread_t tid_c[5];pthread_t tid_p[5];for (int i = 0; i < 5; i++){pthread_create(&tid_c[i], nullptr, run_c, RQ);pthread_create(&tid_p[i], nullptr, run_p, RQ);}for (int i = 0; i < 5; i++){pthread_join(tid_c[i], nullptr);pthread_join(tid_p[i], nullptr);}delete RQ;return 0;
}

更多推荐

Linux——生产者消费者模型

本文发布于:2023-12-05 18:52:57,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1665015.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:生产者   模型   消费者   Linux

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!