建设个人博客网站搜索优化的培训免费咨询
目录
前言
1. 阻塞队列
2. 环形队列
总结
前言
了解了线程控制、同步与互斥、以及消费者生产者模型,本篇文章为实践篇,对以上内容的实践,使用阻塞队列和环形队列来实现生产者消费者模型;
1. 阻塞队列
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出;
模型整体结构:
一个生产线程,一个消费线程,生产线程负责在队列中生产,一个线程负责去取进行消费;
- 队列满的时候,生产线程发生阻塞,不再生产
- 队列为空时,消费线程发生阻塞,不再拿任务;
在这个体系中要理清楚:
由谁来通知生产线程和消费线程? 谁来唤醒线程?
消费者生产者体系中只有生产者知道什么时候需要消费,只有消费者知道什么时候生产;
所以唤醒线程只能互相唤醒
整体结构较为简单,借助数据结构Queue来实现:
在此之前,可以依据RAII的思想对mutex进行封装,不需要手动的添加解锁(也可以使用C++库在的锁):
#pragma once#include <pthread.h>class mutex
{
public:mutex(pthread_mutex_t* lock):_lock(lock){}void Lock(){pthread_mutex_lock(_lock);}void UnLock(){pthread_mutex_unlock(_lock);}~mutex(){}
private:pthread_mutex_t* _lock;
};class LockGuard
{
public:LockGuard(pthread_mutex_t* lock):_mutex(lock){_mutex.Lock();}~LockGuard(){_mutex.UnLock();}
private:mutex _mutex;
};
阻塞队列成员设计:
const int defaultcap = 5;
template<class T>
class BlockQueue
{private:std::queue<T> _q; int _capacity;pthread_mutex_t _mutex;pthread_cond_t _p_cond;pthread_cond_t _c_cond;};
容量设置阻塞队列的大小,锁控制线程安全、两个条件变量用于控制生产线程与消费线程的同步;
核心接口也就只有两个:Push(生产)、Pop(消费);
void Push(const T& in) //生产者
{LockGuard lockguard(&_mutex);while (IsFull()){pthread_cond_wait(&_p_cond, &_mutex);}_q.push(in);// 有数据了可以唤醒消费者线程来进行消费 pthread_cond_signal(&_c_cond);}void Pop(T* out) //消费者
{LockGuard lockguard(&_mutex);while (IsEmpty()){pthread_cond_wait(&_c_cond, &_mutex);}*out = _q.front();_q.pop();// 唤醒生产者pthread_cond_signal(&_p_cond); //唤醒放在释放锁的前边后边都可以}
整体逻辑:
#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"const int defaultcap = 5;template<class T>
class BlockQueue
{
public:BlockQueue(int cap = defaultcap):_capacity(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_p_cond, nullptr);pthread_cond_init(&_c_cond, nullptr);}bool IsFull(){return _q.size() == _capacity;}bool IsEmpty(){return _q.size() == 0;}void Push(const T& in) //生产者{LockGuard lockguard(&_mutex);//pthread_mutex_lock(&_mutex);while (IsFull()){pthread_cond_wait(&_p_cond, &_mutex);}_q.push(in);// if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond); //达到生产水平线就唤醒线程pthread_cond_signal(&_c_cond);//唤醒线程时如果线程本来就醒着,不会有什么影响//pthread_mutex_unlock(&_mutex);}void Pop(T* out) //消费者{LockGuard lockguard(&_mutex);//pthread_mutex_lock(&_mutex);while (IsEmpty()){pthread_cond_wait(&_c_cond, &_mutex);}*out = _q.front();_q.pop();//消费者生产者体系中,只有生产者知道什么时候需要消费//只有消费者知道什么时候生产// if(_q.size() < _consumer_water_line) pthread_cond_signal(&_p_cond);pthread_cond_signal(&_p_cond);//唤醒放在释放锁的前边后边都可以// 在锁内唤醒,线程不会在条件变量上等了,转而会到阻塞到申请锁的队列//pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);}
private:std::queue<T> _q;int _capacity;pthread_mutex_t _mutex;pthread_cond_t _p_cond;pthread_cond_t _c_cond;// int _consumer_water_line; // _consumer_water_line = _capacity / 3 * 2// int _productor_water_line; // _productor_water_line = _capacity / 3
};void *consumer(void *arg)
{BlockQueue<int> *bqp = (BlockQueue<int> *)arg;int data;for (;;){bqp->Pop(&data);std::cout << "Consume data done : " << data << std::endl;}
}// more faster
void *producter(void *arg)
{BlockQueue<int> *bqp = (BlockQueue<int> *)arg;srand((unsigned long)time(NULL));for (;;){int data = rand() % 1024;bqp->Push(data);std::cout << "Prodoct data done: " << data << std::endl;sleep(1);}
}int main()
{BlockQueue<int> bq;pthread_t c, p;pthread_create(&c, NULL, consumer, (void *)&bq);pthread_create(&p, NULL, producter, (void *)&bq);pthread_join(c, NULL);pthread_join(p, NULL);return 0;
}
2. 环形队列
阻塞队列的方法也有部分欠缺,比如:把资源放到队列中,使用互斥锁一次就只能一个线程访问这个队列;这也会降低效率;这时就可以使用信号量来解决;
使用互斥锁时,把队列看成一个整体来访问,使用信号量可以完美解决 多个线程同时访问队列的不同区域;
基于信号量实现环形队列:
- 信号量本质就是一个计数器
- 申请信号量本质是预定资源
- PV操作是原子性的
场景分析:
- 生产者打满这个队列后就不能再生产了(会覆盖原有资源)
- 消费者不能超过生产者
生产者和消费者指向同一位置的两种情况:
- 队列为空(只能让生产者跑)
- 队列为满(只能让消费者跑)
也就是说我们需要局部维持互斥和同步
对资源的描述与认识:空间-->p,数据-->d
所以我们需要两个信号量:
- p: _space_sem N
- c: _data_sem 0
生产者生产:
P(_space_sem)// _space_sem 减减
// 生产数据/任务
V(_data_sem)// _data_sem 加加
消费者消费:
P(_data_sem)// _data_sem 减减
// 生产数据/任务
V(_space_sem )// _space_sem 加加
完整代码:
#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"
#include <vector>
#include <stdlib.h>
#include <semaphore.h>#define NUM 16class RingQueue
{
private:std::vector<int> q;int cap; // 队列容量sem_t data_sem; // 数据的数量sem_t space_sem; // 空余空间数量int consume_step; // 消费偏移量int product_step; // 生产偏移量public:RingQueue(int _cap = NUM) : q(_cap), cap(_cap){sem_init(&data_sem, 0, 0);sem_init(&space_sem, 0, cap);consume_step = 0;product_step = 0;}// 生产void PutData(const int &data){sem_wait(&space_sem); // Pq[consume_step] = data;consume_step++;consume_step %= cap;sem_post(&data_sem); // V}// 消费void GetData(int &data){sem_wait(&data_sem);data = q[product_step];product_step++;product_step %= cap;sem_post(&space_sem);}~RingQueue(){sem_destroy(&data_sem);sem_destroy(&space_sem);}
};void *consumer(void *arg)
{RingQueue *rqp = (RingQueue *)arg;int data;for (;;){rqp->GetData(data);std::cout << "Consume data done : " << data << std::endl;sleep(1);}
}void *producter(void *arg)
{RingQueue *rqp = (RingQueue *)arg;srand((unsigned long)time(NULL));for (;;){int data = rand() % 1024;rqp->PutData(data);std::cout << "Prodoct data done: " << data << std::endl;// sleep(1);}
}int main()
{RingQueue rq;pthread_t c, p;pthread_create(&c, NULL, consumer, (void *)&rq);pthread_create(&p, NULL, producter, (void *)&rq);pthread_join(c, NULL);pthread_join(p, NULL);
}
这是一个简易版本的环形队列,如对信号量或线程控制不太熟悉的伙伴,可以阅读我的这篇文章:
【Linux线程】线程互斥与同步
【Linux线程】线程控制
总结
以上便是本文的全部内容,希望对你有所帮助,感谢阅读!