python 多进程任务调度

作者: zsh2517 分类: 工具,编程,脚本 发布时间: 2021-06-01 13:27

前言

python 的多线程由于 GIL 锁的存在,对于 CPU 密集的操作并不友好。很容易出现 1 核有难,23 核围观的情况。

而很多时候,这个 CPU 密集的计算任务是可以拆分的,可以将其分成多个进程的方式,由系统完成分配进程到不同的核心上运行,以充分利(压)(榨) CPU 的性能。

到现在跑过的一些任务,比如某语料库预处理(千万级别的文字进行分词)、B站弹幕 CRC32 爆破 UID(弹幕 XML 数据是 uid 字符串经过 CRC32 编码后的)、微博历史热搜数据解析等,都是大量重复性操作。而在之前大多是手动通过

python3 proc.py < input1 > output1
python3 proc.py < input2 > output2
python3 proc.py < input3 > output3
...

的方式进行运行的,然后多开 console 以查看进度。因此考虑程序化这个过程

可以采用一个很简单的分配方案,比如拆分成 20 个任务,每次运行 4 个,那么不断检查是否有 4 个任务进程在运行,如果不足 4 个且有未运行的任务,那么运行新的任务,直到 20 个都完成。然后依靠 python 本身的 process 进行封装即可。

计划(和实际)效果如图

通信

通信采用 Queue 进行进程间的通信。

后续有时间继续更新

代码

# from multiprocessing import Process
import multiprocessing as mp
import time
import os
from mf import myFunc


class MyProcessManager():
    func = None
    argList: list = []
    queue = None
    naxProcessCount = 16
    processList = []
    cntProcessIndex = 0

    class MyChildProcess():
        func = None
        arg = None
        queue = None
        index = None
        p = None

        def __init__(self, func, index, arg, queue):
            self.func = func
            self.arg = arg
            self.index = index
            self.queue = queue
            self.p = mp.Process(target=self.proc, args=(self.arg, self.queue))

        def getInstance(self):
            return self.p

        def proc(self, arg, queue):
            self.queue.put((self.index, "start"))

            def callback(msg):
                queue.put((self.index, "run", msg))
            self.func(**arg, callback=callback)
            self.queue.put((self.index, "stop"))

        def start(self):
            self.p.start()

    def __init__(self):
        mp.set_start_method('spawn')
        self.queue = mp.Queue()

    def setArgs(self, argList):
        self.argList = argList

    def setFunc(self, func):
        self.func = func

    def start(self):
        status = []
        for index in range(len(self.argList)):
            status.append(-1)
            arg = self.argList[index]
            temp = self.MyChildProcess(self.func, index, arg, self.queue)
            self.processList.append(temp.getInstance())
        while True:
            try:
                x = self.queue.get_nowait()
                if x[1] == "run":
                    status[x[0]] = x[2]
                if x[1] == "start":
                    status[x[0]] = 0
                if x[1] == "stop":
                    status[x[0]] = 101
            except Exception as e:
                time.sleep(0.2)
            finally:
                os.system("cls")
                for i in range(0, len(status)):
                    if status[i] == 101 or (status[i] != -1 and not self.processList[i].is_alive()):
                        print("%4d[%100s], finished" % (i, "*" * 100))
                    elif status[i] == -1:
                        print("%4d not start" % (i, ))
                    else:
                        print("%4d[%100s]" % (
                            i, "*" * status[i] + " " * (100 - status[i])))
                if self.cntProcessIndex < len(self.processList):
                    count = 0
                    for i in range(0, len(self.processList)):
                        if self.processList[i].is_alive():
                            count += 1
                    if count < self.naxProcessCount:
                        self.processList[self.cntProcessIndex].start()
                        self.cntProcessIndex += 1
                    print(count)
                else:
                    flag = True
                    for i in range(0, len(self.processList)):
                        if status[i] != 101:
                            flag = False
                    if flag:
                        break


if __name__ == '__main__':
    x = MyProcessManager()
    x.setFunc(myFunc)

    def generate():
        left = 0
        right = 80000000
        ans = []
        import random
        while True:
            length = random.randint(1000000, 5000000)
            ans.append({
                "left": left,
                "right": left + length if left + length <= right else right
            })
            left += length
            if left > right:
                break
        return ans

    l = generate()
    print(l)
    x.setArgs(l)

    x.start()

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注