您现在的位置是:网站首页 > 博客日记 >

python进程multiprocessing

作者:YXN-python 阅读量:73 发布日期:2024-05-20

进程

Python入门之——进程multiprocessing(Process 类)简介

Python进程multiprocessing.Process()的使用解读

基本实例

P = Process(target=run,args=(“nice”,),name=‘当前进程名称’)

target指定 子进程运行的函数

args 指定传递的参数 , 是元组类型

启动进程:Process对象.start()

import multiprocessing

def worker(name):
    print(f'已启动进程ID: {multiprocessing.current_process().name}, name: {name}')

if __name__ == '__main__':
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(f'进程-{i}',))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

获取进程信息

os.getpid() 获取当前进程id号

os.getppid() 获取当前进程的父进程id号

multiprocessing.current_process().name 获取当前进程名称

获取CPU核心数

multiprocessing.cpu_count()

父子进程的先后顺序

默认 父进程的结束不能影响子进程 让父进程等待子进程结束再执行父进程

p.join() 阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。

全局变量在过个进程中不能共享

注意: 在子线程中修改全局变量时对父进程中的全局变量没有影响

进程池

import multiprocessing

def worker(arg):
    print("Worker: {}".format(arg))

if __name__ == "__main__":
    # 创建一个进程池
    pool = multiprocessing.Pool(processes=4)

    # 向进程池中提交4个任务
    for i in range(4):
        pool.apply_async(worker, (i,))

    # 等待进程池中的所有任务完成
    pool.close()
    pool.join()

map方法

from multiprocessing import Pool

def scrape(url):
    return url**2

if __name__ == '__main__':
    pool = Pool(processes=4)
    urls = [1, 2, 3, 4, 5]
    res = pool.map(scrape, urls)
    print(res)

全局变量在多个子进程中不能共享

原因:在创建子进程时对全局变量做了一个备份,父进程中num变量与子线程中的num不是一个变量

from multiprocessing import Process

#全局变量在进程中 不能共享
num = 10

def run():
    print("我是子进程的开始")
    global num
    num+=1
    print(num)
    print("我是子进程的结束")

if __name__=="__main__":
    p = Process(target=run)
    p.start()
    p.join()

    print(num)

进程同步

多个进程可能会争用资源,这时我们需要进行同步,避免多个进程同时操作同一资源。multiprocessing提供了Lock和Semaphore等同步机制。

使用Lock来确保同步

import multiprocessing
import time

def worker(lock, number):
    with lock:
        print(f"进程 {number} 开始...")
        time.sleep(1)
        print(f"进程 {number} 完成")

if __name__ == '__main__':
    lock = multiprocessing.Lock()

    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(lock, i))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

进程间通信

队列共享

导入

from multiprocessing import Queue

使用

que = Queue() #创建队列

que.put(数据) #压入数据

que.get() #获取数据

队列常用函数

Queue.empty() 如果队列为空,返回True, 反之False

Queue.full() 如果队列满了,返回True,反之False

Queue.get([block[, timeout]]) 获取队列,timeout等待时间

Queue.get_nowait() 相当Queue.get(False)

Queue.put(item) 阻塞式写入队列,timeout等待时间

Queue.put_nowait(item) 相当Queue.put(item, False)

特点:先进先出

注意:

get方法有两个参数,blocked和timeout,意思为阻塞和超时时间。默认blocked是true,即阻塞式。

当一个队列为空的时候如果再用get取则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用get_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有取到队列元素,那就抛出Queue.Empty异常。

当一个队列为满的时候如果再用put放则会阻塞,所以这时候就需要吧blocked设置为false,即非阻塞式,实际上它就会调用put_nowait()方法,此时还需要设置一个超时时间,在这么长的时间内还没有放进去元素,那就抛出Queue.Full异常。

队列的大小

Queue.qsize() 返回队列的大小 ,不过在 Mac OS 上没法运行。

import multiprocessing

queque = multiprocessing.Queue()  # 创建 队列

# 如果在子进程 和主进程 之间 都压入了数据 那么在主进程 和 子进程 获取的就是 对方的数据
def fun(myque):
    myque.put(['a', 'b', 'c'])  # 在子进程里面压入数据

if __name__ == '__main__':
    queque.put([1, 2, 3, 4, 5])  # 将列表压入队列  如果主进程也压入了数据 那么在主进程取的就是在主进程压入的数据 而不是子进程的
    p = multiprocessing.Process(target=fun, args=(queque,))
    p.start()
    p.join()
    print("主进程获取", queque.get())  # 在主进程进行获取
    print("主进程获取", queque.get())  # 在主进程进行获取

列表共享

import multiprocessing

def fun(lst):
    lst.append('x')
    lst.append('y')
    lst.append('z')

if __name__ == '__main__':
    # Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构。
    List = multiprocessing.Manager().list()
    p = multiprocessing.Process(target=fun, args=(List,))
    p.start()
    p.join()
    print(List)

字典共享

import multiprocessing

def fun(mydict):
    mydict['x'] = 'x'
    mydict['y'] = 'y'
    mydict['z'] = 'z'

if __name__ == '__main__':
    # Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构。
    mydict = multiprocessing.Manager().dict()
    p = multiprocessing.Process(target=fun, args=(mydict,))
    p.start()
    p.join()
    print(mydict)

deamon

每个进程程都可以单独设置它的属性,如果设置为True,当父进程结束后,子进程会自动被终止。

进程.daemon = True

设置在start()方法之前

import multiprocessing
import time

def fun():
    time.sleep(100)

if __name__ == '__main__':
    p = multiprocessing.Process(target=fun)
    p.daemon = True
    p.start()
    print('over')

强行终止子进程

进程名.terminate() 强行终止子进程

import multiprocessing
import time

def fun():
    time.sleep(100)

if __name__ == '__main__':
    p = multiprocessing.Process(target=fun)
    p.start()
    p.terminate()
    p.join()
    print('over')

 

 

YXN-python

2024-05-20