在生产者和消费者模型中,我们通常要考虑生产速度和消费速度不匹配的问题,如何保证队列中的元素有序运转,需要进行队列调度模型设计。考虑每个队列元素在任意时刻的状态,可以分为4种:空闲(未被生产的空数据),就绪(生产完成待消费),正在被消费,正在被生产,可以设计两个队列FreeQue,ReadyQue,即空闲队列和就绪队列,每次生产时去空闲队列拿一个空闲节点填充数据后,送入就绪队列,每次消费时从就绪队列取走一个节点进行消费,消费完成后送入空闲队列,这样就有序地将各个环节有机组合起来,实现了较为完备的生产消费模型。

//CQModel.h

#pragma once
#include <queue>
#include <mutex>
#include <memory>

class ElementMeta
{
public:
	typedef void(*Deleter)(void* data, unsigned int len);
	ElementMeta();
	~ElementMeta();
	void InitElementMeta(void* data, unsigned int len, Deleter  func= nullptr);
	bool IsInitialized();
	void GetElementMeta(const void** data, unsigned int* len);
private:
	Deleter m_meta_deleter = nullptr;
	void* m_meta_data = nullptr;
	unsigned int m_meta_len = 0;
};

class Queue
{
public:
	Queue(int max_cnt);
	int AppendElement(const std::shared_ptr<ElementMeta>& meta);
	std::shared_ptr<ElementMeta> GetFirstElement();
	std::shared_ptr<ElementMeta> GetLastElement();
	int GetMaxQueSize();
	int GetCurrentQueSize();
private:
	std::deque<std::shared_ptr<ElementMeta>> m_Que;
	std::mutex m_Mtx;
	int m_nQueMaxSize = 0;
};

class QueOpreator
{
public:
	virtual std::shared_ptr<ElementMeta> PullElementFromQue() = 0;
	virtual int PushElementToQue(const std::shared_ptr<ElementMeta>& elem) = 0;
};

// 给生产者用的操作类
class QueForP :public QueOpreator
{
public:
	QueForP() = delete;
	QueForP(Queue* fq, Queue* rq);
	std::shared_ptr<ElementMeta> PullElementFromQue();//operate m_FQue;
	int PushElementToQue(const std::shared_ptr<ElementMeta>& elem);//operate m_RQue;
private:
	Queue* m_FQue = nullptr;
	Queue* m_RQue = nullptr;
};

// 给消费者用的操作类
class QueForC :public QueOpreator
{
public:
	QueForC() = delete;
	QueForC(Queue* fq, Queue* rq);
	std::shared_ptr<ElementMeta> PullElementFromQue();//operate m_RQue;
	int PushElementToQue(const std::shared_ptr<ElementMeta>& elem);//operate m_FQue;
private:
	Queue* m_FQue;
	Queue* m_RQue;
};




//"CQModel.cpp"
#include "CQModel.h"

ElementMeta::ElementMeta() : m_meta_deleter(nullptr),
m_meta_data(nullptr),
m_meta_len(0)
{}

void ElementMeta::InitElementMeta(void* data, unsigned int len, ElementMeta::Deleter  func)
{
	m_meta_deleter = func;
	m_meta_data = data;
	m_meta_len = len;
}

bool ElementMeta::IsInitialized()
{
	return m_meta_data != nullptr && m_meta_len > 0;
}

ElementMeta::~ElementMeta()
{
	if (m_meta_data != nullptr && m_meta_deleter != nullptr)
	{
		m_meta_deleter(m_meta_data, m_meta_len);
		m_meta_data = nullptr;
		m_meta_deleter = nullptr;
	}
}

void ElementMeta::GetElementMeta(const void** data, unsigned int* len)
{
	if (data != nullptr)
		*data = m_meta_data;
	if (len != nullptr)
		*len = m_meta_len;
}

Queue::Queue(int max_cnt) :m_nQueMaxSize(max_cnt)
{}

int Queue::AppendElement(const std::shared_ptr<ElementMeta>& meta)
{
	std::lock_guard<std::mutex> locker(m_Mtx);
	if (m_Que.size() > m_nQueMaxSize)
		return -1;
	m_Que.push_back(meta);
	return 0;
}
std::shared_ptr<ElementMeta> Queue::GetFirstElement()
{
	std::lock_guard<std::mutex> locker(m_Mtx);
	if (m_Que.empty())
	{
		std::shared_ptr<ElementMeta> dummy = std::shared_ptr<ElementMeta>();
		return dummy;
	}
	std::shared_ptr<ElementMeta> data = m_Que.front();
	m_Que.pop_front();
	return data;
}

std::shared_ptr<ElementMeta> Queue::GetLastElement()
{
	std::lock_guard<std::mutex> locker(m_Mtx);
	std::shared_ptr<ElementMeta> data = m_Que.back();
	m_Que.pop_back();
	return data;
}

int Queue::GetMaxQueSize()
{
	std::lock_guard<std::mutex> locker(m_Mtx);
	return m_nQueMaxSize;
}

int Queue::GetCurrentQueSize()
{

	return m_Que.size();;
}


QueForP::QueForP(Queue* fq, Queue* rq)
{
	m_RQue = rq;
	m_FQue = fq;
}

std::shared_ptr<ElementMeta> QueForP::PullElementFromQue()//operate m_FQue;
{
	return m_FQue->GetFirstElement();
}

int QueForP::PushElementToQue(const  std::shared_ptr<ElementMeta>& elem)//operate m_RQue;
{
	int ret = m_RQue->AppendElement(elem);
	if (ret == -1)
	{
		auto data = m_RQue->GetFirstElement();//drop too old and reuse it
		m_FQue->AppendElement(data);
		return m_RQue->AppendElement(elem);
	}
	return ret;
}


QueForC::QueForC(Queue* fq, Queue* rq)
{
	m_RQue = rq;
	m_FQue = fq;
}

std::shared_ptr<ElementMeta> QueForC::PullElementFromQue()//operate m_RQue;
{
	return m_RQue->GetFirstElement();
}

int QueForC::PushElementToQue(const std::shared_ptr<ElementMeta>& elem)//operate m_FQue;
{
	int ret = m_FQue->AppendElement(elem);
	return -1;
	//if(ret == -1)
	//    m_FQue->GetFirstElement();//drop too old
	//return m_FQue->AppendElement(elem);
}

生产者代码示例:

//Producer.h

#pragma once

class Queue;
class QueOpreator;

class Producer
{
public:
	void Init(Queue* rq, Queue* fq);
	void Process();
private:
	QueOpreator* m_Que = nullptr;
};


//Producer.cpp

#include "Producer.h"
#include "CQModel.h"

void Producer::Init(Queue* rq, Queue* fq)
{
	m_Que = new QueForP(rq, fq);
};

static int id = 0;
void Producer::Process()
{
	std::shared_ptr<ElementMeta> data = m_Que->PullElementFromQue();
	if (data.get() == nullptr)
	{
		data = std::make_shared<ElementMeta>();
	}
	const void* d = nullptr;
	unsigned int len = 0;
	if (!data->IsInitialized())
	{
		char* buf = new char[100];
		_snprintf_s(buf, 100, 99, "msg id =%d.", id++);
		data->InitElementMeta(buf, 100, [](void* data, unsigned int len) {delete[] data; });
		d = buf;
	}
	else
	{
		data->GetElementMeta(&d, &len);
		memset(const_cast<void*>(d), 0x00, len);
		_snprintf_s((char*)d, len, len - 1, "msg id =%d.", id++);
	}
	// ... Process ... 
	printf("Genernate: %s.\n", (char*)d);
	int ret = m_Que->PushElementToQue(data);
}

消费者代码示例:

// Consumer.h

#pragma once

class Queue;
class QueOpreator;
class Consumer
{
public:
	void Init(Queue* rq, Queue* fq);
	void Process();
private:
	QueOpreator* m_Que = nullptr;
};


//Consumer.cpp

#include "Consumer.h"
#include "CQModel.h"

void Consumer::Init(Queue* rq, Queue* fq)
{
	m_Que = new QueForC(rq, fq);
};

void Consumer::Process()
{
	std::shared_ptr<ElementMeta> data = m_Que->PullElementFromQue();
	if (data.get() == nullptr || !data->IsInitialized()) //dummy node, skip
	{
		return;
	}
	// .... Process ...
	const void* d = nullptr;
	unsigned int len = 0;
	data->GetElementMeta(&d, &len);
	printf("Consume: %s.\n", (char*)d);
	m_Que->PushElementToQue(data);
}

 

客户端使用:

#include "Consumer.h"
#include "Producer.h"
#include "CQModel.h"
#include <thread>

void ThreadFunc(Queue* rq, Queue*fq)
{
    Consumer c;
    c.Init(rq, fq);
    while (1)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(5000));//模拟消费者消费速度慢的情况
        c.Process();
    }
}

int main()
{
    Queue rq(1024), fq(1024);
    std::thread t1(&ThreadFunc,&rq,&fq);
    Producer p;
    p.Init(&rq, &fq);
    while (1)
    {
        p.Process();
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));//生产者生产速度较快
    }
    t1.join();
    return 0;
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部