python进程、线程通信
作者:YXN-python 阅读量:24 发布日期:2024-11-25
进程通信
管道(Pipes)
os.pipe():创建一个管道,用于父子进程之间的通信。
multiprocessing.Pipe():创建一个双向管道,用于进程间的通信。
import multiprocessing
def child_proc(pipe):
pipe.send("从子进程访问!")
pipe.close()
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=child_proc, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 接收来自子进程的消息
p.join()
队列(Queues)
multiprocessing.Queue():创建一个队列,用于多个进程之间的通信,它是线程和进程安全的。
from multiprocessing import Process, Queue
def worker(queue):
queue.put('子进程压入数据')
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
print(q.get())
p.join()
共享内存(Shared Memory)
multiprocessing.Value() 和 multiprocessing.Array():用于共享简单的数据类型。
multiprocessing.Manager():可以创建不同类型的共享对象,如字典、列表等。
from multiprocessing import Process, Value
def modify_value(val):
val.value = 10
if __name__ == '__main__':
num = Value('i', 0)
p = Process(target=modify_value, args=(num,))
p.start()
p.join()
print(num.value)
套接字(Sockets)
使用TCP/IP或UDP/IP套接字进行进程间通信,适用于不同机器上的进程通信。
from multiprocessing import Process, Pipe
import socket
def server(host, port, conn):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host, port))
server.listen(1)
client, addr = server.accept()
conn.send(client.makefile('r')) # 将文件对象发送给父对象
server.close()
def client_process(conn):
client_file = conn.recv()
print(client_file.read()) # 从客户端读取
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=server, args=('localhost', 9999, parent_conn))
p.start()
client_process(child_conn)
p.join()
管理器(Managers):
Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构。
from multiprocessing import Process, Manager
def modify_dict(d):
d['key'] = 'value'
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
p = Process(target=modify_dict, args=(d,))
p.start()
p.join()
print(d['key'])
内存映射文件(Memory-mapped files)
使用mmap模块,允许多个进程共享一个文件的内容。
from multiprocessing import Process
import mmap
import os
def child_process(fd):
with mmap.mmap(fd, 256, access=mmap.ACCESS_WRITE) as mm:
mm[10:20] = b'child'
if __name__ == '__main__':
with open('/tmp/mmap_test', 'wb', 256) as f:
f.write(b'parent' * 20)
fd = os.open('/tmp/mmap_test', os.O_RDWR)
p = Process(target=child_process, args=(fd,))
p.start()
p.join()
with open('/tmp/mmap_test', 'rb') as f:
print(f.read(20)) # 应打印 'parent' + 'child' + 'parent'
线程通信
线程通信主要是指在同一个进程中不同线程之间的数据交换和同步。由于线程共享相同的内存空间,因此线程间的通信通常比进程间通信要简单和高效。以下是一些在Python中常用的线程通信和同步机制:
锁(Locks)
threading.Lock():最基本的线程同步机制,用于防止多个线程同时访问共享资源。
import threading
# 使用Lock
lock = threading.Lock()
shared_resource = 0
def update_resource():
global shared_resource
with lock:
shared_resource += 1
[threading.Thread(target=update_resource).start() for i in range(5)]
print(shared_resource)
事件(Events)
threading.Event():用于线程间的信号传递,一个线程可以等待事件被设置,而另一个线程可以设置事件。
import threading
import time
# 创建一个事件对象
event = threading.Event()
# 等待事件被设置
def wait_on_event(event):
print("等待事件被设置...")
event.wait() # 等待事件被设置
print("事件已设置")
# 设置事件
def set_event(event):
print("设置事件...")
time.sleep(2) # 模拟耗时操作
event.set() # 设置事件
# 创建并启动线程
wait_thread = threading.Thread(target=wait_on_event, args=(event,))
set_thread = threading.Thread(target=set_event, args=(event,))
wait_thread.start()
set_thread.start()
wait_thread.join()
set_thread.join()
条件变量(Condition Variables)
threading.Condition():比锁更高级的同步机制,允许一个或多个线程等待某个条件成立。
import threading
condition = threading.Condition()
def consumer():
with condition:
condition.wait() # 等待条件成立
print("资源可用,正在消耗...")
def producer():
with condition:
print("生产资源...")
condition.notify() # 通知消费者资源可用
threading.Thread(target=consumer).start()
threading.Thread(target=producer).start()
信号量(Semaphores)
threading.Semaphore():用于限制对共享资源的访问数量。
import threading
import time
semaphore = threading.Semaphore(2)
def access_resource():
with semaphore:
print("访问共享资源...")
time.sleep(1)
# 使用Semaphore
[threading.Thread(target=access_resource).start() for _ in range(3)]
队列(Queues)
queue.Queue():线程安全的队列,适用于生产者-消费者模式。
import threading
import queue
import time
# 创建一个线程安全的队列
q = queue.Queue()
# 生产者线程
def producer(q, count):
for i in range(count):
q.put(i) # 将项目放入队列
print(f"生产 {i}")
time.sleep(0.1)
print("生产完成")
# 消费者线程
def consumer(q):
while True:
item = q.get() # 从队列中获取项目
if item is None: # 使用None作为结束信号
q.task_done()
break
print(f"消耗 {item}")
time.sleep(0.2)
q.task_done()
# 创建并启动生产者线程
producer_thread = threading.Thread(target=producer, args=(q, 10))
# 创建并启动消费者线程
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
# 等待生产者线程完成
producer_thread.join()
# 所有生产完成后,向队列中添加None作为结束信号
# 只需要一个None信号即可
q.put(None)
# 等待队列中的所有项目被处理完毕
q.join()
# 等待消费者线程完成
consumer_thread.join()
print("所有任务都完成了.")
屏障(Barriers)
threading.Barrier():用于让一组线程相互等待,直到所有线程都到达某个点。
import threading
# 创建一个屏障对象,需要3个线程到达屏障点
barrier = threading.Barrier(3)
# 定义一个函数,使用屏障来同步线程
def do_task(n):
print(f"线程 {n} 正在工作...")
barrier.wait() # 等待其他线程到达屏障点
print(f"线程 {n} 在屏障后继续")
# 创建并启动线程
[threading.Thread(target=do_task, args=(i,)).start() for i in range(3)]
读写锁(Reader-Writer Locks)
threading.RLock() 和 threading.Lock() 可以组合使用,用于实现读写锁,允许多个读操作同时进行,但写操作是独占的。
import threading
# 创建一个可重入锁
rlock = threading.RLock()
# 创建一个读锁和写锁
read_lock = threading.Lock()
write_lock = threading.RLock()
# 定义一个函数,使用读写锁来同步线程
def read_data():
with read_lock:
with rlock: # 确保没有写操作
print("读取数据...")
def write_data():
with write_lock:
with rlock: # 确保没有读或写操作
print("写入数据...")
# 创建并启动线程
read_thread = threading.Thread(target=read_data)
write_thread = threading.Thread(target=write_data)
read_thread.start()
write_thread.start()
read_thread.join()
write_thread.join()
YXN-python
2024-11-25