个人主页:C++忠实粉丝
欢迎 点赞 收藏 留言 加关注本文由 C++忠实粉丝 原创

Linux系统基础-多线程超详细讲解(5)_单例模式与线程池

收录于专栏[Linux学习]
本专栏旨在分享学习Linux的一点学习笔记,欢迎大家在评论区交流讨论

目录

书接上回  -> 实现基于环形队列的生产消费模型

Ring Queue.hpp

Task.hpp

Main.cc

1. 单例模式

什么是单例模式

什么是设计模式

单例模式的特点

实现单例模式的方法

饿汉实现方式和懒汉实现方式

饿汉实现单例模式

懒汉实现单例模式

2. STL, 智能指针和线程安全

STL中的容器是否是线程安全的?

智能指针是否线程安全?

3. 其他常见的各种锁

4. 线程池的实现

线程池介绍 :

线程池的应用场景 :

线程池的示例 :

线程池的实现 :

Lock Guard.hpp 

Log.hpp

Thread.hpp

Task.hpp

ThreadPool.hpp

Main.cc

 效果展示 :


书接上回  -> 实现基于环形队列的生产消费模型

环形队列采用数组模拟,用模运算来模拟环状特性 

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态

 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

Ring Queue.hpp

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <semaphore.h>

template <typename T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }
    void V(sem_t &s)
    {
        sem_post(&s);
    }
public:
    RingQueue(int max_cap)
        : _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0)
    {
        sem_init(&_data_sem, 0, 0);
        sem_init(&_space_sem, 0, max_cap);

        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }
    void Push(const T &in) //生产者
    {
        //信号量 : 是一个计数器, 是资源的预定机制, 预定 : 在外部, 可以不判断资源时候满足, 就可以知道内部资源的情况
        P(_space_sem);//信号量这里, 对资源进行使用, 申请, 为什么不判断一下条件是否满足???信号量本身就是判断条件
        pthread_mutex_lock(&_p_mutex);
        _ringqueue[_p_step] = in;
        _p_step++;
        _p_step %= _max_cap;
        pthread_mutex_unlock(&_p_mutex);
        V(_data_sem);
    }
    void Pop(T *out)
    {
        P(_data_sem);
        pthread_mutex_lock(&_c_mutex);
        *out = _ringqueue[_c_step];
        _c_step %= _max_cap;
        pthread_mutex_unlock(&_c_mutex);
        V(_space_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&_data_sem);
        sem_destroy(&_space_sem);
        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }   
private:
    std::vector<T> _ringqueue;
    int _max_cap;
    int _c_step;
    int _p_step;
    sem_t _data_sem;//消费者关心
    sem_t _space_sem;//生产者关心
    pthread_mutex_t _c_mutex;
    pthread_mutex_t _p_mutex;
};

类成员变量 : 

_ringqueue : 存储队列元素的动态数组, 使用 std::vector 来实现

_max_cap : 队列的最大容量

_c_step : 消费者指针, 指向下一个要被消费的元素的位置

_p_step : 生产者指针, 指向下一个要被生产的元素的位置

_data_sem : 计数信号量, 表示队列中可供消费的元素数量

_space_sem : 计数信号量, 表示队列中可供生产的空间数量

_c_mutex : 互斥锁, 用于保护消费者操作

_p_mutex : 互斥锁, 用于保护生产者操作

构造函数 :

RingQueue(int max_cap) : 初始化环形队列的最大容量, 并设置初始值, 初始化信号量和互斥锁

sem_init : 用于初始化信号量, _data_sem 初始化为0, 因为队列开始为空, _space_sem 初始化为 max_cap :  因为队列一开始是空的.

pthread_mutex_init : 初始化互斥锁

生产者方法 :

功能 : 向队列中添加一个元素

1. 调用 P(_space_sem), 等待有空余空间 (空间信号量减少)

2. 加锁 _p_mutex, 确保线程安全

3. 将输入元素存储到 _ringqueue 的 _p_step 位置

4. 更新 _p_step , 并使用取模操作确保它在容量范围内

5. 解锁 _p_mutex

6. 调用 V(_data_sem), 增加可用元素的计数

消费者方法 :

功能 : 从队列中取出一个元素.

1. 调用 P(_data_sem) , 等待有可用数据 (数据信号量减少)

2. 加锁 _c_mutex , 确保线程安全

3. 将 _ringqueue 中, _c_step 位置的元素赋给 out

4. 更新 _c_step, 并使用取模操作确保它在容量范围内

5. 解锁 _c_mutex

6. 调用 V(_space_sem), 增加可用空间的计数

析构函数 :

Task.hpp

#pragma once

#include <iostream>
#include <functional>

class Task
{
public:
    Task()
    {
    }
    Task(int x, int y) : _x(x), _y(y)
    {
    }
    void Excute()
    {
        _result = _x + _y;
    }
    void operator()()
    {
        Excute();
    }
    std::string debug()
    {
        std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
        return msg;
    }
    std::string result()
    {
        std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
        return msg;
    }

private:
    int _x;
    int _y;
    int _result;
};

Main.cc

#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>

void *Consumer(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        Task t;
        // 1. 消费
        rq->Pop(&t);
        //2. 处理数据
        t();
        std::cout << "Consumer->" << t.result() << std::endl;
    }
}

void *Productor(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);

    while(true)
    {
        sleep(1);
        //1. 构造数据
        int x = rand() % 10 + 1;//[1, 10]
        usleep(x * 1000);
        int y = rand() % 10 + 1;
        Task t(x, y);

        //2. 生产
        rq->Push(t);

        std::cout << "Productor ->" << t.debug() << std::endl;
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    RingQueue<Task> *rq = new RingQueue<Task>(5);

    pthread_t c1, c2, p1, p2, p3;
    pthread_create(&c1, nullptr, Consumer, rq);
    pthread_create(&c2, nullptr, Consumer, rq);
    pthread_create(&p1, nullptr, Productor, rq);
    pthread_create(&p2, nullptr, Productor, rq);
    pthread_create(&p3, nullptr, Productor, rq);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);

    return 0;
}

效果展示 :

1. 单例模式

什么是单例模式

单例模式是一种 "很经典, 常用的, 常考的" 设计模式

什么是设计模式

IT 行业这么火, 涌入的人很多, 俗话说林子大了什么鸟都有, 大佬和菜鸡们两极分化越来越严重, 为了让菜鸡们不太拖大佬的后退, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式!

单例模式的特点

某些类, 只应该具有一个对象 (实例), 就称之为单例

例如一个男人只能有一个媳妇.

在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中, 此时往往要用一个单例的类来管理这些数据

实现单例模式的方法

饿汉实现方式和懒汉实现方式

吃完饭, 立即洗碗, 这种就是饿汉方式, 因为下一顿吃的时候可以立刻拿着碗就能吃饭

吃完饭, 先把碗放下, 然后下一顿用到这个碗了再洗碗, 就是懒汉模式

懒汉方式最核心的思想是 "延时加载", 从而能够优化服务器的启动速度

饿汉实现单例模式

template <typename T>
class Singleton
{
    static T data;

public:
    static T *GetInstance()
    {
        return &data;
    }
};

只要通过 SingLeton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例. 

懒汉实现单例模式

template <typename T>
class Singleton
{
    static T *inst;

public:
    static T *GetInstance()
    {
        if (inst == NULL)
        {
            inst = new T();
        }
        return inst;
    }
};

存在一个严重的问题, 线程不安全 , 第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象实例. 但是后续再次调用, 就没有问题了

改良版  :

// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
    volatile static T *inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
    static std::mutex lock;

public:
    static T *GetInstance()
    {
        if (inst == NULL)
        {                // 双重判定空指针, 降低锁冲突的概率, 提高性能.
            lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
            if (inst == NULL)
            {
                inst = new T();
            }
            lock.unlock();
        }
        return inst;
    }
};

注意事项 : 

1. 加锁解锁的位置

2. 双重 if 判定, 避免不必要的锁竞争

3. volatile 关键字防止过度优化 

2. STL, 智能指针和线程安全

STL中的容器是否是线程安全的?

不是, 原因是 STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大影响. 而且对于不同的容器, 加锁方式的不同, 性能可能也不同 (例如 hash 表的加锁和锁桶)

因此 STL 默认不是线程安全, 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.

智能指针是否线程安全?

对于 unique_ptr , 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.

对于 shared_ptr , 多个对象需要公用一个引用计数变量, 所以会存在线程安全的问题, 但是标准库实现的时候考虑到了这个问题, 基于原子操作 (CAS) 的方式保证了 shared_ptr 能够高效, 原子的操作引用计数

3. 其他常见的各种锁

悲观锁 : 在每次取数据时, 总是担心数据会被其他线程修改, 所以会在取数据前先加锁 (读锁, 写锁, 行锁), 当其他线程想要访问数据时, 被阻塞挂起

乐观锁 : 每次取数据时候, 总是乐观的认为数据不会被其他线程修改, 因此不上锁, 但是在跟新数据前, 会判断其他数据在跟新前有没有对数据进行修改, 主要采用两种方式, 版本号机制和CAS 操作

CAS 操作 : 当需要更新数据时, 判断当前内存值和之前取得的值时候相等, 如果相等则用新值更新, 若不相等则失败, 失败则重试, 一般是一个自旋的过程, 即不断重试

还有自旋锁, 公平锁, 非公平锁

4. 线程池的实现

线程池介绍 :

线程池 : 一种线程使用模式, 线程过多会带来调度开销, 进而影响缓存局部性和整体性能, 而线程池维护着多个线程, 等待着监督管理者分配可并发执行的任务,这避免了在处理短时间任务时创建于销毁线程的代价, 线程池不仅能够保证内核的充分利用, 还能防止过分调度, 可用线程数量应该取决于可以的并发处理器, 处理内核, 内存, 网络socket等的数量

线程池的应用场景 :

1. 需要大量的线程完成任务,且完成任务的时间比较短, WEB 服务器完成网页请求这样的任务, 使用线程技术是非常适合的,因为单个任务小,任务数量巨大, 你可以想象一个热门网站的点击次数, 但对于长时间的任务, 比如一个 Telnet 连接请求, 线程池的优点就不明显了, 因为 Telnet 会话时间比线程创建时间大多了

2. 对性能要求苛刻的应用, 比如要求服务器迅速相应客户请求

3. 接受突发性的大量请求, 但不至于使服务器因此产生大量线程的应用,突发性大量客户请求, 在没有线程池情况下, 将产生大量线程, 虽然理论上大部分操作系统线程数目最大值不是问题, 短时间内产生大量线程可能使内存到达极限,出现错误

线程池的示例 :

1. 创建固定数量线程池, 循环从任务队列中获取任务对象

2. 获取任务对象后, 执行任务对象中的任务接口

线程池的实现 :

Lock Guard.hpp 

#pragma once

#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t *_mutex;
};

这段代码定义了一个名为 LockGuard 的类, 它用于通过 RAII 机制来管理互斥锁 (mutex) 的加锁与解锁, LockGuard 类的设计思想与 C++ 标准库中的 std::lock_guard 类似, 但它是为了使用 POSIX 线程库 (pthreads) 中的 pthread_mutex_t 类型实现的互斥锁而实现的 

这个构造函数接受一个 pthread_mutex_t* 类型的指针 mutex, 并在构造时执行以下操作 : 

1. 将传入的 mutex 指针保存到类的成员变量 _mutex 中.

2. 调用 pthread_mutex_lock(_mutex) 来加锁, 确保在进入临界区时互斥锁访问共享资源.

pthread_mutex_lock 会尝试获取互斥锁, 如果锁已经被其他线程持有, 则当线程会被阻塞, 直到该锁被释放, 这个加锁操作是 LockGuard 的关键功能, 它保证了对象创建时锁自动加上

析构函数在 LockGuard 对象生命周期结束时调用, 当 LockGuard 对象销毁时, 它自动解锁互斥锁, 调用 pthread_mutex_unlock(_mutex), 这使得锁的释放成为自动, 无需显示调用解锁代码

Log.hpp

#pragma once

#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "Lock Guard.hpp"

namespace log_ns
{

    enum
    {
        DEBUG = 1,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    std::string LevelToString(int level)
    {
        switch (level)
        {
        case DEBUG:
            return "DEBUG";
        case INFO:
            return "INFO";
        case WARNING:
            return "WARNING";
        case ERROR:
            return "ERROR";
        case FATAL:
            return "FATAL";
        default:
            return "UNKNOWN";
        }
    }

    std::string GetCurrTime()
    {
        time_t now = time(nullptr);
        struct tm *curr_time = localtime(&now);
        char buffer[128];
        snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
                 curr_time->tm_year + 1900,
                 curr_time->tm_mon + 1,
                 curr_time->tm_mday,
                 curr_time->tm_hour,
                 curr_time->tm_min,
                 curr_time->tm_sec);
        return buffer;
    }

    class logmessage
    {
    public:
        std::string _level;
        pid_t _id;
        std::string _filename;
        int _filenumber;
        std::string _curr_time;
        std::string _message_info;
    };

#define SCREEN_TYPE 1
#define FILE_TYPE 2

    const std::string glogfile = "./log.txt";
    pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;

    // log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );
    class Log
    {
    public:
        Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
        {
        }
        void Enable(int type)
        {
            _type = type;
        }
        void FlushLogToScreen(const logmessage &lg)
        {
            printf("[%s][%d][%s][%d][%s] %s",
                   lg._level.c_str(),
                   lg._id,
                   lg._filename.c_str(),
                   lg._filenumber,
                   lg._curr_time.c_str(),
                   lg._message_info.c_str());
        }
        void FlushLogToFile(const logmessage &lg)
        {
            std::ofstream out(_logfile, std::ios::app);
            if (!out.is_open())
                return;
            char logtxt[2048];
            snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
                     lg._level.c_str(),
                     lg._id,
                     lg._filename.c_str(),
                     lg._filenumber,
                     lg._curr_time.c_str(),
                     lg._message_info.c_str());
            out.write(logtxt, strlen(logtxt));
            out.close();
        }
        void FlushLog(const logmessage &lg)
        {
            // 加过滤逻辑 --- TODO

            LockGuard lockguard(&glock);
            switch (_type)
            {
            case SCREEN_TYPE:
                FlushLogToScreen(lg);
                break;
            case FILE_TYPE:
                FlushLogToFile(lg);
                break;
            }
        }
        void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
        {
            logmessage lg;

            lg._level = LevelToString(level);
            lg._id = getpid();
            lg._filename = filename;
            lg._filenumber = filenumber;
            lg._curr_time = GetCurrTime();

            va_list ap;
            va_start(ap, format);
            char log_info[1024];
            vsnprintf(log_info, sizeof(log_info), format, ap);
            va_end(ap);
            lg._message_info = log_info;

            // 打印出来日志
            FlushLog(lg);
        }
        ~Log()
        {
        }

    private:
        int _type;
        std::string _logfile;
    };

    Log lg;

#define LOG(Level, Format, ...)                                          \
    do                                                                   \
    {                                                                    \
        lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
    } while (0)
#define EnableScreen()          \
    do                          \
    {                           \
        lg.Enable(SCREEN_TYPE); \
    } while (0)
#define EnableFILE()          \
    do                        \
    {                         \
        lg.Enable(FILE_TYPE); \
    } while (0)
};

这里实现了一个简单的日志系统, 用于程序中进行日志记录, 日志可以打印到屏幕或写入文件中, 以下是日志的详细分析 :

1. 日志等级枚举和转换函数

代码这里枚举了5个日志等级 : DEBUG, INFO, WARNING, ERROR, FATAL, 这些等级用于标识日志的严重性

LevelToString 函数根据日志等级返回对应的字符串表示, 如果传入的等级不在预定义的范围内, 返回 "UNKNOWN"

2. 获取当前时间

GetCurrTime 返回当前时间, 格式为 YYYY-MM-DD HH:MM:SS, 通过调用 time()localtime() 获取当前时间, 并格式化为字符串返回

3. 日志消息结构体

logmessage 结构体存储了每条日志的相关信息 :

_level : 日志等级

_id : 当前进程 ID, 使用 getpid() 获取

_filename : 日志来源的文件名

_filenumber : 日志来源的行号

_curr_time : 当前时间 (字符串格式)

_message_info : 日志的具体内容 (格式化后的日志信息)  

4. 日志类型常量

SCREEN_TYPE : 日志输出到屏幕

FILE_TYPE : 日志输出到文件 

5. 全局日志文件和互斥锁

glogfile 定义了日志文件的路径 (就是在当前目录下生成 log.txt 文件)

glock 是一个互斥锁, 用于保护日志的多线程安全, 通过 pthread_mutex_t LockGuard 来确保多线程环境下日志文件的安全访问 

6. 日志类 

构造函数初始化日志文件路径并设置默认日志输出类型为屏幕输出 (SCREEN_TYPE).

Enable 函数用于设置日志输出类型 (屏幕或文件)

FlushLogToScrren 将日志消息打印到屏幕

FlushLogToFile 将日志消息写入文件, 它以追加模式打开文件, 格式化日志信息后写入文件

FlushLog 是主逻辑函数, 负责根据 _type 决定将日志输出到屏幕还是文件, 并在此过程中使用 LockGuard 保护日志的写操作, 避免多线程的竞争条件.

logMessage 是日志的主要接口, 接受文件名, 行号, 日志等级, 格式化字符串和变参信息, 将其组合成 logmessage 对象后调用 Flushlog 输出日志 

7. 宏定义

LOG 宏是用户记录日志的入口, 自动传递当前文件名 (__FILE__) 和行号 (__LINE__), 以及用户提供的日志等级和格式化字符串

EnableScreen EnableFILE 用于设置日志输出类型, 分别为屏幕输出和文件输出

Thread.hpp

#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>

namespace ThreadMoudle
{
    using func_t = std::function<void(const std::string &)>;

    class Thread
    {
    public:
        void Excute()
        {
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }

    public:
        Thread(const std::string &name, func_t func) : _name(name), _func(func)
        {
        }
        static void *ThreadRoutine(void *args) // 新线程都会执行该方法
        {
            Thread *self = static_cast<Thread *>(args); // 获取当前对象
            self->Excute();
            return nullptr;
        }

        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if (n != 0)
                return false;
            return true;
        }

        std::string Status()
        {
            if (_isrunning)
                return "running";
            else
                return "sleep";
        }

        void Stop()
        {
            if (_isrunning)
            {
                ::pthread_cancel(_tid);
                _isrunning = false;
            }
        }

        void Join()
        {
            ::pthread_join(_tid, nullptr);
        }
        std::string Name()
        {
            return _name;
        }
        ~Thread()
        {
        }

    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func; // 线程要执行的回调函数
    };
} // namespace ThreadModle

这个自己封装的线程类在上一章有详细介绍, 这里就不再多说~~, 想看的宝子们可以去这个专栏翻一下 

Task.hpp

#pragma once

#include <iostream>
#include <functional>

class Task
{
public:
    Task()
    {}
    Task(int x, int y) :_x(x), _y(y)
    {}
    void Excute()
    {
        _result = _x + _y;
    }
    void operator()()
    {
        Excute();
    }
    std::string debug()
    {
        std::string msg = std::to_string(_x) + "x" + std::to_string(_y) + "= ?";
        return msg;
    }
    std::string result()
    {
        std::string msg = std::to_string(_x) + "x" + std::to_string(_y) + "=" + std::to_string(_result); 
        return msg;
    }
private:
    int _x;
    int _y;
    int _result;
};

ThreadPool.hpp

#pragma once

#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "Lock Guard.hpp"

using namespace ThreadMoudle;
using namespace log_ns;

static const int gdefaultnum = 5;

void test()
{
    while (true)
    {
        std::cout << "hello world" << std::endl;
        sleep(1);
    }
}

template <typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void WakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    bool IsEmpty()
    {
        return _task_queue.empty();
    }
    void HandlerTask(const std::string &name) // this
    {
        while (true)
        {
            // 取任务
            LockQueue();
            while (IsEmpty() && _isrunning)
            {
                _sleep_thread_num++;
                LOG(INFO, "%s thread sleep begin!\n", name.c_str());
                Sleep();
                LOG(INFO, "%s thread wakeup!\n", name.c_str());
                _sleep_thread_num--;
            }
            // 判定一种情况
            if (IsEmpty() && !_isrunning)
            {
                UnlockQueue();
                LOG(INFO, "%s thread quit\n", name.c_str());
                break;
            }

            // 有任务
            T t = _task_queue.front();
            _task_queue.pop();
            UnlockQueue();

            // 处理任务
            t(); // 处理任务,此处不用/不能在临界区中处理
            // std::cout << name << ": " << t.result() << std::endl;
            LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());
        }
    }
    void Init()
    {
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i++)
        {
            std::string threadname = "thread-" + std::to_string(i + 1);
            _threads.emplace_back(threadname, func);
            LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());
        }
    }
    void Start()
    {
        _isrunning = true;
        for (auto &thread : _threads)
        {
            LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
            thread.Start();
        }
    }
    ThreadPool(int thread_num = gdefaultnum)
        : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    ThreadPool(const ThreadPool<T> &) = delete;
    void operator=(const ThreadPool<T> &) = delete;

public:
    void Stop()
    {
        LockQueue();
        _isrunning = false;
        WakeupAll();
        UnlockQueue();
        LOG(INFO, "Thread Pool Stop Success!\n");
    }

    // 如果是多线程获取单例呢?
    static ThreadPool<T> *GetInstance()
    {
        if (_tp == nullptr)
        {
            LockGuard lockguard(&_sig_mutex);
            if (_tp == nullptr)
            {
                LOG(INFO, "create threadpool\n");
                // thread-1 thread-2 thread-3....
                _tp = new ThreadPool();
                _tp->Init();
                _tp->Start();
            }
            else
            {
                LOG(INFO, "get threadpool\n");
            }
        }
        return _tp;
    }

    void Equeue(const T &in)
    {
        LockQueue();
        if (_isrunning)
        {
            _task_queue.push(in);
            if (_sleep_thread_num > 0)
                Wakeup();
        }
        UnlockQueue();
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    int _thread_num;
    std::vector<Thread> _threads;
    std::queue<T> _task_queue;
    bool _isrunning;

    int _sleep_thread_num;

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    // 单例模式
    // volatile static ThreadPool<T> *_tp;
    static ThreadPool<T> *_tp;
    static pthread_mutex_t _sig_mutex;
};

template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;

1. 成员变量

_thread_num : 指定线程池的线程数, 默认为 gdefaultnum (5个线程)

_threads : 一个存储所有工具线程的指针 (std::vector<Thread>), 每个线程负责处理任务队列中的任务

_task_queue : 一个 std::queue<T>, 存储待处理的任务, 任务类型由模板参数 T 决定

_isrunning : 一个布尔值, 指示线程池是否正在运行

_sleep_thread_num : 记录当前处于休眠状态的线程数

_mutex : 互斥锁, 用于保护任务队列的访问

_cond : 条件变量, 用于线程的的等待和唤醒

_tp : 一个静态成员, 指向线程池的单例实例

_sig_mutex : 一个静态互斥锁, 用于保护 GetInstance() 方法中的单例创建 

2. 私有成员函数

LockQueue/UnlockQueue

这些函数用于锁定和解锁 _mutex, 确保在访问共享资源 (任务队列 _task_queue) 时线程安全

Wakeup/WakeAll

这两个函数分别用来唤醒一个线程或所有线程, 条件标量 _cond 配合使用, 在任务队列不为空时唤醒线程.

Sleep 

当任务队列为空且线程池仍在运行时, 线程会调用 Sleep() 进入等待状态, 直到新的任务到来或者线程池被停止.

IsEmpty

检查任务队列是否为空

HandlerTask

每个线程都会运行此函数来处理任务, 它会不断从任务队列中取出任务并执行, 如果队列为空, 它会让线程进入休眠状态, 直到新的任务到来

Init

用于初始化线程池, 根据 _thread_num 创建相应数量的线程, 并为每个线程绑定任务处理函数

Start

启动线程池中的所有线程, 线程开始执行 HeadlerTask 函数 

Stop

停止线程池的所有线程, 通过设置 _isrunning 为 false 并唤醒所有线程, 所有正在等待的线程将会退出

Equeue

将任务添加到任务队列中, 如果线程池正在运行, 且有线程处于睡眠状态, 则会唤醒一个线程开始处理任务. 

3. 单例模式 

ThreadPool 类使用了懒汉模式, 确保在多线程环境中只有一个线程池实例, 为了避免多个线程的同时创建线程池实例, 使用了 pthread_mutex_t _sig_mutex 来加锁 

4. 析构函数

在析构函数中, 销毁了互斥锁和条件标量

5. 静态成员变量初始化

_tp 用来存储线程池的唯一实例, _sig_mutex 是用于同步创建实例的互斥锁 

Main.cc

#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Log.hpp"
// #include <memory>

using namespace log_ns;

int main()
{
    EnableScreen();
    // std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>(); //c++14
    // ThreadPool<Task> *tp = new ThreadPool<Task>();
    // tp->Init();
    // tp->Start();
    int cnt = 10;
    while(cnt)
    {
        // 不断地向线程池推送任务
        sleep(1);
        Task t(1,1);
        ThreadPool<Task>::GetInstance()->Equeue(t);
        LOG(INFO, "equeue a task, %s\n", t.debug().c_str());
        sleep(1);
        cnt--;
    }

    ThreadPool<Task>::GetInstance()->Stop();
    LOG(INFO, "thread pool stop!\n");
    return 0;
}

 效果展示 :

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部