在生产者和消费者模型中,我们通常要考虑生产速度和消费速度不匹配的问题,如何保证队列中的元素有序运转,需要进行队列调度模型设计。考虑每个队列元素在任意时刻的状态,可以分为4种:空闲(未被生产的空数据),就绪(生产完成待消费),正在被消费,正在被生产,可以设计两个队列FreeQue,ReadyQue,即空闲队列和就绪队列,每次生产时去空闲队列拿一个空闲节点填充数据后,送入就绪队列,每次消费时从就绪队列取走一个节点进行消费,消费完成后送入空闲队列,这样就有序地将各个环节有机组合起来,实现了较为完备的生产消费模型。
//CQModel.h
#pragma once
#include <queue>
#include <mutex>
#include <memory>
class ElementMeta
{
public:
typedef void(*Deleter)(void* data, unsigned int len);
ElementMeta();
~ElementMeta();
void InitElementMeta(void* data, unsigned int len, Deleter func= nullptr);
bool IsInitialized();
void GetElementMeta(const void** data, unsigned int* len);
private:
Deleter m_meta_deleter = nullptr;
void* m_meta_data = nullptr;
unsigned int m_meta_len = 0;
};
class Queue
{
public:
Queue(int max_cnt);
int AppendElement(const std::shared_ptr<ElementMeta>& meta);
std::shared_ptr<ElementMeta> GetFirstElement();
std::shared_ptr<ElementMeta> GetLastElement();
int GetMaxQueSize();
int GetCurrentQueSize();
private:
std::deque<std::shared_ptr<ElementMeta>> m_Que;
std::mutex m_Mtx;
int m_nQueMaxSize = 0;
};
class QueOpreator
{
public:
virtual std::shared_ptr<ElementMeta> PullElementFromQue() = 0;
virtual int PushElementToQue(const std::shared_ptr<ElementMeta>& elem) = 0;
};
// 给生产者用的操作类
class QueForP :public QueOpreator
{
public:
QueForP() = delete;
QueForP(Queue* fq, Queue* rq);
std::shared_ptr<ElementMeta> PullElementFromQue();//operate m_FQue;
int PushElementToQue(const std::shared_ptr<ElementMeta>& elem);//operate m_RQue;
private:
Queue* m_FQue = nullptr;
Queue* m_RQue = nullptr;
};
// 给消费者用的操作类
class QueForC :public QueOpreator
{
public:
QueForC() = delete;
QueForC(Queue* fq, Queue* rq);
std::shared_ptr<ElementMeta> PullElementFromQue();//operate m_RQue;
int PushElementToQue(const std::shared_ptr<ElementMeta>& elem);//operate m_FQue;
private:
Queue* m_FQue;
Queue* m_RQue;
};
//"CQModel.cpp"
#include "CQModel.h"
ElementMeta::ElementMeta() : m_meta_deleter(nullptr),
m_meta_data(nullptr),
m_meta_len(0)
{}
void ElementMeta::InitElementMeta(void* data, unsigned int len, ElementMeta::Deleter func)
{
m_meta_deleter = func;
m_meta_data = data;
m_meta_len = len;
}
bool ElementMeta::IsInitialized()
{
return m_meta_data != nullptr && m_meta_len > 0;
}
ElementMeta::~ElementMeta()
{
if (m_meta_data != nullptr && m_meta_deleter != nullptr)
{
m_meta_deleter(m_meta_data, m_meta_len);
m_meta_data = nullptr;
m_meta_deleter = nullptr;
}
}
void ElementMeta::GetElementMeta(const void** data, unsigned int* len)
{
if (data != nullptr)
*data = m_meta_data;
if (len != nullptr)
*len = m_meta_len;
}
Queue::Queue(int max_cnt) :m_nQueMaxSize(max_cnt)
{}
int Queue::AppendElement(const std::shared_ptr<ElementMeta>& meta)
{
std::lock_guard<std::mutex> locker(m_Mtx);
if (m_Que.size() > m_nQueMaxSize)
return -1;
m_Que.push_back(meta);
return 0;
}
std::shared_ptr<ElementMeta> Queue::GetFirstElement()
{
std::lock_guard<std::mutex> locker(m_Mtx);
if (m_Que.empty())
{
std::shared_ptr<ElementMeta> dummy = std::shared_ptr<ElementMeta>();
return dummy;
}
std::shared_ptr<ElementMeta> data = m_Que.front();
m_Que.pop_front();
return data;
}
std::shared_ptr<ElementMeta> Queue::GetLastElement()
{
std::lock_guard<std::mutex> locker(m_Mtx);
std::shared_ptr<ElementMeta> data = m_Que.back();
m_Que.pop_back();
return data;
}
int Queue::GetMaxQueSize()
{
std::lock_guard<std::mutex> locker(m_Mtx);
return m_nQueMaxSize;
}
int Queue::GetCurrentQueSize()
{
return m_Que.size();;
}
QueForP::QueForP(Queue* fq, Queue* rq)
{
m_RQue = rq;
m_FQue = fq;
}
std::shared_ptr<ElementMeta> QueForP::PullElementFromQue()//operate m_FQue;
{
return m_FQue->GetFirstElement();
}
int QueForP::PushElementToQue(const std::shared_ptr<ElementMeta>& elem)//operate m_RQue;
{
int ret = m_RQue->AppendElement(elem);
if (ret == -1)
{
auto data = m_RQue->GetFirstElement();//drop too old and reuse it
m_FQue->AppendElement(data);
return m_RQue->AppendElement(elem);
}
return ret;
}
QueForC::QueForC(Queue* fq, Queue* rq)
{
m_RQue = rq;
m_FQue = fq;
}
std::shared_ptr<ElementMeta> QueForC::PullElementFromQue()//operate m_RQue;
{
return m_RQue->GetFirstElement();
}
int QueForC::PushElementToQue(const std::shared_ptr<ElementMeta>& elem)//operate m_FQue;
{
int ret = m_FQue->AppendElement(elem);
return -1;
//if(ret == -1)
// m_FQue->GetFirstElement();//drop too old
//return m_FQue->AppendElement(elem);
}
生产者代码示例:
//Producer.h
#pragma once
class Queue;
class QueOpreator;
class Producer
{
public:
void Init(Queue* rq, Queue* fq);
void Process();
private:
QueOpreator* m_Que = nullptr;
};
//Producer.cpp
#include "Producer.h"
#include "CQModel.h"
void Producer::Init(Queue* rq, Queue* fq)
{
m_Que = new QueForP(rq, fq);
};
static int id = 0;
void Producer::Process()
{
std::shared_ptr<ElementMeta> data = m_Que->PullElementFromQue();
if (data.get() == nullptr)
{
data = std::make_shared<ElementMeta>();
}
const void* d = nullptr;
unsigned int len = 0;
if (!data->IsInitialized())
{
char* buf = new char[100];
_snprintf_s(buf, 100, 99, "msg id =%d.", id++);
data->InitElementMeta(buf, 100, [](void* data, unsigned int len) {delete[] data; });
d = buf;
}
else
{
data->GetElementMeta(&d, &len);
memset(const_cast<void*>(d), 0x00, len);
_snprintf_s((char*)d, len, len - 1, "msg id =%d.", id++);
}
// ... Process ...
printf("Genernate: %s.\n", (char*)d);
int ret = m_Que->PushElementToQue(data);
}
消费者代码示例:
// Consumer.h
#pragma once
class Queue;
class QueOpreator;
class Consumer
{
public:
void Init(Queue* rq, Queue* fq);
void Process();
private:
QueOpreator* m_Que = nullptr;
};
//Consumer.cpp
#include "Consumer.h"
#include "CQModel.h"
void Consumer::Init(Queue* rq, Queue* fq)
{
m_Que = new QueForC(rq, fq);
};
void Consumer::Process()
{
std::shared_ptr<ElementMeta> data = m_Que->PullElementFromQue();
if (data.get() == nullptr || !data->IsInitialized()) //dummy node, skip
{
return;
}
// .... Process ...
const void* d = nullptr;
unsigned int len = 0;
data->GetElementMeta(&d, &len);
printf("Consume: %s.\n", (char*)d);
m_Que->PushElementToQue(data);
}
客户端使用:
#include "Consumer.h"
#include "Producer.h"
#include "CQModel.h"
#include <thread>
void ThreadFunc(Queue* rq, Queue*fq)
{
Consumer c;
c.Init(rq, fq);
while (1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(5000));//模拟消费者消费速度慢的情况
c.Process();
}
}
int main()
{
Queue rq(1024), fq(1024);
std::thread t1(&ThreadFunc,&rq,&fq);
Producer p;
p.Init(&rq, &fq);
while (1)
{
p.Process();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));//生产者生产速度较快
}
t1.join();
return 0;
}
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » 队列调度(双队列)
发表评论 取消回复