一、定义    

        信号量,从本质上来说,它是一种特殊的计数器。其核心作用在于对临界资源中资源数量进行精确的描述与把控。以电影院为例,观众们会竞相选择不同的座位,而这些座位作为共享资源,被拆分成了多个独立的部分。只要同时观影的线程(观众)数量未超出座位数量,便能确保多线程(多观众)对这一临界资源(电影院座位)的并发访问得以顺利实现。

        在信号量的机制中,极为关键的一点是要保证 PV 操作的原子性。这里的计数器,其本质上就代表着临界资源的实际数量。通过一种巧妙的设计,将资源是否就绪的判断放置在临界区之外,这样一来,一旦线程能够进入相关区域,就意味着资源必然是就绪可用的。当线程申请信号量时,实际上已经间接地完成了对资源可用性的判断,这种设计极大地提高了资源利用的效率与安全性,避免了因资源竞争而可能引发的混乱与错误。

二、基于环形队列的生产消费模型

        在基于环形队列的生产消费模型中,有着独特的运行规则与特性。当环形队列处于不空且不满的状态时,生产者与消费者所指向的并非同一块空间,这种设计为二者的并发操作提供了可能。只有在队列空或者满的特殊情况下,它们才会指向同一个位置。

我们可以将其类比为一场特殊的追逐游戏,在这个游戏中有三个必须满足的条件:

1、当生产者与消费者指向同一个位置,同一时刻只能有一方进行访问

当队列空时:生产者访问

当队列满时:消费者

2、消费者不能超过生产者

3、生产者不能把消费者套个圈,即不能在消费者尚未消费完之前,过度生产数据而导致数据覆盖或者逻辑混乱。

        在这个模型中,生产者主要关注环形队列还有多少剩余空间,用 SpaceSem (N)表示;而消费者则侧重于关注队列中还有多少剩余数据,用 DataSem(0) 表示。通过这种方式,双方能够依据信号量的指示,有条不紊地进行生产与消费活动,实现整个系统的高效稳定运行。

三、代码如下:

main.cpp

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"

using namespace std;

struct ThreadData
{
    RingQueue<Task> *rq;
    std::string threadname;
};

void *Productor(void *args)
{
    // sleep(3);
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    int len = opers.size();
    while (true)
    {
        // 1. 获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 2. 生产数据
        rq->Push(t);
        cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;

        sleep(1);
    }
    return nullptr;
}

void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;

    while (true)
    {
        // 1. 消费数据
        Task t;
        rq->Pop(&t);
       
        // 2. 处理数据
        t();
        cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
        // sleep(1);

    }
    return nullptr;
}

int main()
{
    srand(time(nullptr) ^ getpid());
    RingQueue<Task> *rq = new RingQueue<Task>(50);

    pthread_t c[5], p[3];

    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Productor-" + std::to_string(i);

        pthread_create(p + i, nullptr, Productor, td);
    }
    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(c + i, nullptr, Consumer, td);
    }

    for (int i = 0; i < 1; i++)
    {
        pthread_join(p[i], nullptr);
    }
    for (int i = 0; i < 1; i++)
    {
        pthread_join(c[i], nullptr);
    }

    return 0;
}
RingQueue.hpp

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>

const static int defaultcap = 5;

template<class T>
class RingQueue{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
    {
        sem_init(&cdata_sem_, 0, 0);
        sem_init(&pspace_sem_, 0, cap);

        pthread_cond_init(&c_mutex_, nullptr);
        pthread_cond_init(&p_mutex_, nullptr);
    }
    void Push(const T &in) // 生产
    {
        P(pspace_sem_);

        Lock(p_mutex_); // ?
        ringqueue_[p_step_] = in;
        // 位置后移,维持环形特性
        p_step_++;
        p_step_ %= cap_;
        Unlock(p_mutex_); 

        V(cdata_sem_);

    }
    void Pop(T *out)       // 消费
    {
        P(cdata_sem_);

        Lock(c_mutex_); // ?
        *out = ringqueue_[c_step_];
        // 位置后移,维持环形特性
        c_step_++;
        c_step_ %= cap_;
        Unlock(c_mutex_); 

        V(pspace_sem_);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);

        pthread_mutex_destroy(&c_mutex_);
        pthread_mutex_destroy(&p_mutex_);
    }
private:
    std::vector<T> ringqueue_;
    int cap_;

    int c_step_;       // 消费者下标
    int p_step_;       // 生产者下标

    sem_t cdata_sem_;  // 消费者关注的数据资源
    sem_t pspace_sem_; // 生产者关注的空间资源

    pthread_cond_t c_mutex_;
    pthread_cond_t p_mutex_;
};
Task.hpp

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};
makefile

RingQueueTest:Main.cpp
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f RingQueueTest

 执行结果

 

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部