您现在的位置是:网站首页 > 博客日记 >

python进程、线程通信

作者:YXN-python 阅读量:24 发布日期:2024-11-25

进程通信

管道(Pipes)

os.pipe():创建一个管道,用于父子进程之间的通信。

multiprocessing.Pipe():创建一个双向管道,用于进程间的通信。

import multiprocessing

def child_proc(pipe):
    pipe.send("从子进程访问!")
    pipe.close()

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=child_proc, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # 接收来自子进程的消息
    p.join()

队列(Queues)

multiprocessing.Queue():创建一个队列,用于多个进程之间的通信,它是线程和进程安全的。

from multiprocessing import Process, Queue

def worker(queue):
    queue.put('子进程压入数据')

if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    print(q.get())
    p.join()

共享内存(Shared Memory)

multiprocessing.Value() 和 multiprocessing.Array():用于共享简单的数据类型。

multiprocessing.Manager():可以创建不同类型的共享对象,如字典、列表等。

from multiprocessing import Process, Value

def modify_value(val):
    val.value = 10

if __name__ == '__main__':
    num = Value('i', 0)
    p = Process(target=modify_value, args=(num,))
    p.start()
    p.join()
    print(num.value)

套接字(Sockets)

使用TCP/IP或UDP/IP套接字进行进程间通信,适用于不同机器上的进程通信。

from multiprocessing import Process, Pipe
import socket

def server(host, port, conn):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((host, port))
    server.listen(1)
    client, addr = server.accept()
    conn.send(client.makefile('r'))  # 将文件对象发送给父对象
    server.close()

def client_process(conn):
    client_file = conn.recv()
    print(client_file.read())  # 从客户端读取

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=server, args=('localhost', 9999, parent_conn))
    p.start()
    client_process(child_conn)
    p.join()

管理器(Managers):

Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构。

from multiprocessing import Process, Manager

def modify_dict(d):
    d['key'] = 'value'

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        p = Process(target=modify_dict, args=(d,))
        p.start()
        p.join()
        print(d['key'])

内存映射文件(Memory-mapped files)

使用mmap模块,允许多个进程共享一个文件的内容。

from multiprocessing import Process
import mmap
import os

def child_process(fd):
    with mmap.mmap(fd, 256, access=mmap.ACCESS_WRITE) as mm:
        mm[10:20] = b'child'

if __name__ == '__main__':
    with open('/tmp/mmap_test', 'wb', 256) as f:
        f.write(b'parent' * 20)
    fd = os.open('/tmp/mmap_test', os.O_RDWR)
    p = Process(target=child_process, args=(fd,))
    p.start()
    p.join()
    with open('/tmp/mmap_test', 'rb') as f:
        print(f.read(20))  # 应打印 'parent' + 'child' + 'parent'

 

线程通信

线程通信主要是指在同一个进程中不同线程之间的数据交换和同步。由于线程共享相同的内存空间,因此线程间的通信通常比进程间通信要简单和高效。以下是一些在Python中常用的线程通信和同步机制:

锁(Locks)

threading.Lock():最基本的线程同步机制,用于防止多个线程同时访问共享资源。

import threading

# 使用Lock
lock = threading.Lock()
shared_resource = 0

def update_resource():
    global shared_resource
    with lock:
        shared_resource += 1

[threading.Thread(target=update_resource).start() for i in range(5)]
print(shared_resource)

事件(Events)

threading.Event():用于线程间的信号传递,一个线程可以等待事件被设置,而另一个线程可以设置事件。

import threading
import time

# 创建一个事件对象
event = threading.Event()

# 等待事件被设置
def wait_on_event(event):
    print("等待事件被设置...")
    event.wait()  # 等待事件被设置
    print("事件已设置")

# 设置事件
def set_event(event):
    print("设置事件...")
    time.sleep(2)  # 模拟耗时操作
    event.set()  # 设置事件

# 创建并启动线程
wait_thread = threading.Thread(target=wait_on_event, args=(event,))
set_thread = threading.Thread(target=set_event, args=(event,))

wait_thread.start()
set_thread.start()

wait_thread.join()
set_thread.join()

条件变量(Condition Variables)

threading.Condition():比锁更高级的同步机制,允许一个或多个线程等待某个条件成立。

import threading

condition = threading.Condition()

def consumer():
    with condition:
        condition.wait()  # 等待条件成立
        print("资源可用,正在消耗...")

def producer():
    with condition:
        print("生产资源...")
        condition.notify()  # 通知消费者资源可用

threading.Thread(target=consumer).start()
threading.Thread(target=producer).start()

信号量(Semaphores)

threading.Semaphore():用于限制对共享资源的访问数量。

import threading
import time

semaphore = threading.Semaphore(2)

def access_resource():
    with semaphore:
        print("访问共享资源...")
        time.sleep(1)

# 使用Semaphore
[threading.Thread(target=access_resource).start() for _ in range(3)]

队列(Queues)

queue.Queue():线程安全的队列,适用于生产者-消费者模式。

import threading
import queue
import time

# 创建一个线程安全的队列
q = queue.Queue()


# 生产者线程
def producer(q, count):
    for i in range(count):
        q.put(i)  # 将项目放入队列
        print(f"生产 {i}")
        time.sleep(0.1)
    print("生产完成")


# 消费者线程
def consumer(q):
    while True:
        item = q.get()  # 从队列中获取项目
        if item is None:  # 使用None作为结束信号
            q.task_done()
            break
        print(f"消耗 {item}")
        time.sleep(0.2)
        q.task_done()


# 创建并启动生产者线程
producer_thread = threading.Thread(target=producer, args=(q, 10))
# 创建并启动消费者线程
consumer_thread = threading.Thread(target=consumer, args=(q,))

producer_thread.start()
consumer_thread.start()

# 等待生产者线程完成
producer_thread.join()

# 所有生产完成后,向队列中添加None作为结束信号
# 只需要一个None信号即可
q.put(None)

# 等待队列中的所有项目被处理完毕
q.join()

# 等待消费者线程完成
consumer_thread.join()

print("所有任务都完成了.")

屏障(Barriers)

threading.Barrier():用于让一组线程相互等待,直到所有线程都到达某个点。

import threading

# 创建一个屏障对象,需要3个线程到达屏障点
barrier = threading.Barrier(3)


# 定义一个函数,使用屏障来同步线程
def do_task(n):
    print(f"线程 {n} 正在工作...")
    barrier.wait()  # 等待其他线程到达屏障点
    print(f"线程 {n} 在屏障后继续")


# 创建并启动线程
[threading.Thread(target=do_task, args=(i,)).start() for i in range(3)]

读写锁(Reader-Writer Locks)

threading.RLock() 和 threading.Lock() 可以组合使用,用于实现读写锁,允许多个读操作同时进行,但写操作是独占的。

import threading

# 创建一个可重入锁
rlock = threading.RLock()

# 创建一个读锁和写锁
read_lock = threading.Lock()
write_lock = threading.RLock()

# 定义一个函数,使用读写锁来同步线程
def read_data():
    with read_lock:
        with rlock:  # 确保没有写操作
            print("读取数据...")

def write_data():
    with write_lock:
        with rlock:  # 确保没有读或写操作
            print("写入数据...")

# 创建并启动线程
read_thread = threading.Thread(target=read_data)
write_thread = threading.Thread(target=write_data)

read_thread.start()
write_thread.start()

read_thread.join()
write_thread.join()

YXN-python

2024-11-25