一、【写在前面】

最近csdn每天写两篇文章有推广券,趁这个机会写一个python相关的文章吧。

一般我们的任务都可以分为计算密集型任务和IO密集型任务。

python因为全局GIL锁的存在,任何时候只有一个python线程在运行,所以说不能利用多核CPU的优势,这意味着python做计算密集型任务不是很合适。如果提升多核能力,使用multiprocess库或者其他语言写的库

但是在IO密集型任务中,在对速度和效率要求比较高的场景中,我们可以将python的多线程功能用起来。但是多线程必定bug多,这个是不可避免的,所以笔者也只敢在操作影响不大的地方写多线程

这篇文章写一个尽可能通用的python多线程非阻塞的代码示例

二、【名词解释】

1. 多线程

一个进程中可以有多个线程,好比饭店如果只有一个人,炒菜端菜都是一个人干,这就是单线程;

一个饭店有人切菜有人炒菜有人端菜,这就是三线程。

2. 线程之间的通信

而切菜的人把菜丢给炒菜的人,炒菜的人做好丢给端菜的人,这个过程叫做线程之间的通信

3. 线程阻塞

如果切菜的人是个浑浊懵愣的货,切完一个菜之后,厨师没有将菜收走,案板满了。这个切菜的人不愿意干活了,这就是阻塞。

4. 线程池

饭店老板非常有钱,每次有新的工作都会重新招一个人来干,这就叫创建新的线程。

饭店老板如果比较精明,有了新活都抓饭店里闲着的人干,而不招新的人,这就叫线程池。

5. 锁

饭店里只有一个卫生间,如果大家一起上卫生间会有人害羞而尿不出来。所以在员工上卫生间的时候需要加锁。程序中一般是在访问公共变量的时候需要加锁。

6. 线程导致的主程序退出

切菜的师傅切了个炸弹,然后厨师没有确认直接开炒,整个饭店都炸掉了。一般来说线程崩溃就崩溃了,但是如果主程序的某个流程是依赖线程结果的,那么线程崩溃就会把主程序带崩。

三、【代码编写】

这段代码是针对多台机器,每台机器的任务类似的一个多线程的主函数。照着这个往里塞东西就行

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def func1():
    pass

def func2():
    pass

def func3():
    pass

if __name__ == "__main__":
    # init global vars
    vars_dict = {}
    futures = {}
    
    # init locker 线程共用的变量要加锁否则会有奇奇怪怪的问题
    vars_dict_lock = threading.Lock()
    futures_lock = threading.Lock()
    
    # machine list 考虑有机器无法连接的情况
    Machine_List = ['machine1','machine2','machine3','cant_connected_machine']
    
    # prometheus metrix init 针对每个机器都有的变量,避免冗余代码可以这样初始化
    for host in Machine_List:
        ## trading flag monitor below
        vars_dict[f'var1_{host}']  = 'var1'
        vars_dict[f'var2_{host}']= 'var2'
        ## end trading flag mon
        
    # 定义任务函数映射
    task_functions = {
        'describe for func1': func1,
        'describe for func2': func2,
        'describe for func3':func3
    }

    # multi thread task 线程池 10线程
    with ThreadPoolExecutor(max_workers=10) as executor:
        while True:
            with futures_lock:  # 每次操作公共变量需要操作锁
                for host in Machine_List:
                    # 分别为每个任务创建一个唯一的键
                    for task_name, task_func in task_functions.items():
                        task_key = f"{task_name}_{host}"
                        # 如果线程没有创建或没有完成,防止没跑完就起新的
                        if task_key not in futures or futures[task_key].done():
                            futures[task_key] = executor.submit(task_func, host)
                time.sleep(30)
    

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部