线程同步与多进程
概述
线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作;
Event
Event
事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag
,能过flag
的True
或者False
的变化来进行操作
名称 | 含义 |
---|---|
set() |
标记设置为True。 |
clear() |
标记设置为False。 |
is_set() |
标记是否为True。 |
import threading
e = threading.Event()
print(e.is_set()) # False
print(e.set()) # 给e设置一个标志位, 为True
print(e.is_set()) # True
import threading
import logging
import time
FOMATE = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FOMATE)
# 1. 一个老板,雇佣了10个员工,工人生成10个杯子;
# 2. 工人生成中,老板盯着他,工人生产完,老板夸他;
# flag = False
event = threading.Event() # 事件对象,替代了flag
def boss():
logging.info("I'm boss, I'm watching you ~~~")
event.wait() # 阻塞等待,直到flag为True
logging.info("Good job")
def worker(count=10):
global flag
logging.info("I'm working for U.")
cups = []
while True:
time.sleep(1)
cups.append("1")
if len(cups) >= count:
event.set() # 设置flag为True
break
logging.info("Finished ,cpus={}".format(len(cups)))
b1 = threading.Thread(target=boss, name='boss1')
b2 = threading.Thread(target=boss, name='boss2')
w = threading.Thread(target=worker, name='worker')
b1.start()
b2.start()
w.start()
- 延迟执行的线程
import threading
e = threading.Event()
print(e.is_set())
print(e.wai(3))
print(e.is_set())
threading.Timer(3, lambda : e.set()).start()
print('-' * 30 )
print(e.wait(10))
print(e.is_set())
import threading
import logging
import time
FOMATE = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FOMATE)
# 1. 一个老板,雇佣了10个员工,工人生成10个杯子;
# 2. 工人生成中,老板盯着他,工人生产完,老板夸他;
# flag = False
event = threading.Event() # 事件对象,替代了flag
def boss(e):
logging.info("I'm boss, I'm watching you ~~~")
e.wait() # 阻塞等待,直到flag为True
logging.info("Good job")
def worker(e,count=10):
global flag
logging.info("I'm working for U.")
cups = []
while not e.wait(0.5):
# time.sleep(1)
cups.append("1")
if len(cups) >= count:
event.set() # 设置flag为True
#
logging.info("Finished ,cpus={}".format(len(cups)))
b1 = threading.Thread(target=boss, name='boss1',args=(event,)) # event 用于一对多的通知
b2 = threading.Thread(target=boss, name='boss2',args=(event,))
w = threading.Thread(target=worker, name='worker',args=(event,))
b1.start()
b2.start()
w.start()
Lock
- Lock类是mutex互斥锁
- 一旦一个线程获得锁,其它试图获取锁的线程将被阻塞,只到拥有锁的线程释放锁
- 凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。
名称 | 含义 |
---|---|
acquire(blocking=True, timeout=-1) |
默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout 禁止设置。成功获取锁,返回True,否则返回False。 |
release() |
释放锁。可以从任何线程调用释放。已上锁的锁会被重置为unlocked。未上锁的锁上调用,抛 RuntimeError 异常。 |
import threading
lock = threading.Lock()
print(lock.locked())
print(lock.acquire())
print(lock.locked())
print(lock.acquire(timeout=3))
print('='*30)
print(lock.acquire(False)) # 非阻塞的时候不能指定timeout
print('='*30)
import logging
import threading
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
lock = threading.Lock()
def worker():
logging.info('I am working ~~~')
lock.acquire()
logging.info('end working ~~~')
for i in range(5):
threading.Thread(target=worker,daemon=False, name='worker-{}'.format(i)).start()
while True:
cmd = input('>>>')
if cmd == 'r':
lock.release()
print('release one times')
elif cmd == 'q':
break
else:
print(threading.enumerate())
print(lock.locked())
# 以上说明多个线程共享一个锁,一个线程获取锁后,其他线程就不能获取锁,只能等待锁释放后,才能获取锁,这是跨线程操作的
- 锁简单的示例
import logging
import threading
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
# 10个工人共同生产100个杯子
lock = threading.Lock() # 锁对象
cpus = []
def worker(count=100):
logging.info('working ~~~')
while True:
lock.acquire()
if len(cpus) >= count:
lock.release() # 第一处释放锁
break
time.sleep(0.01)
# lock.release() # 第二处释放锁
cpus.append('1')
lock.release() # 第三处释放锁, 哪一处是对的?
logging.info('finished ~~~ cpus={}'.format(len(cpus)))
for i in range(10):
threading.Thread(target=worker, name='worker-{}'.format(i)).start() # 10个线程,共同生产100个杯子
- 锁分析
位置2分析:
- 假设某一个瞬间,有一个工作线程A获取了锁,len(cups)正好有999个,然后就释放了锁,可以继续执行下面的语句,生产一个杯子,这地方不阻塞,但是正好杯子也没有生产完。锁释放后,其他线程就可以获得锁,线程B获得了锁,发现len(cups)也是999个,然后释放锁,然后也可以去生产一个杯子。锁释放后,其他的线程也可能获得锁。就说A和B线程都认为是999个,都会生产一个杯子,那么实际上最后一定会超出1000个。
- 假设某个瞬间一个线程获得锁,然后发现杯子到了1000个,没有释放锁就直接break了,由于其他线程还在阻塞等待锁释放,这就成了死锁了。
位置3分析:
- 获得锁的线程发现是999,有资格生产杯子,生产一个,释放锁,看似很完美
- 问题在于,获取锁的线程发现杯子有1000个,直接break,没释放锁离开了,死锁了
位置1分析:
- 如果线程获得锁,发现是1000,break前释放锁,没问题
- 问题在于,A线程获得锁后,发现小于1000,继续执行,其他线程获得锁全部阻塞。A线程再次执行循环后,自己也阻塞了。死锁了。
锁用完了、一定要释放,即上下文干的事儿、进去要干啥、离开后一定要干啥~~
锁的应用场景
锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
如果全部都是读取同一个共享资源需要锁吗?
- 不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一桦的值,所以不用加锁。
使用锁的注意事项:
- 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行。举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
- 加锁时间越短越好,不需要就立即释放锁。
- 一定要避免死锁。
不使用锁,有了效率,但是结果是错的。
使用了锁,效率低下,但是结果是对的。
GIL全局解释器锁
CPython 在解释器进程级别有一把锁,叫做GIL,即全局解释器锁。
GIL 保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也只允许同时只能有一个CPU核心上运行该进程的一个线程。
在CPython中:
- IO密集型任务,当某个线程阻塞时,GIL会释放,就会调度其他就绪线程。
- CPU密集型任务,当前线程可能会连续的获得GIL,导致其它线程几乎无法使用CPU。
- 在CPython中由于有GIL存在,IO密集型任务使用多线程较为合算;CPU密集型任务使用多进程,要绕开GIL。
新版CPython正在努力优化GIL的问题,但不是移除。
如果在意多线程的效率问题,请绕行,选择其它语言如Erlang、Go等。
Python中绝大多数内置数据结构的读、写操作都是原子操作。由于GIL的存在,Python的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它们本身不是线程安全类型。
保留GIL的原因:
GvR坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全、简单的使用Python。而且移除GIL,会降低CPython单线程的执行效率。
import threading
import datetime
import logging
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
def calc():
s = 0
for i in range(100000000):
s += 1
logging.info(s)
start = datetime.datetime.now()
t1 = threading.Thread(target=calc)
t2 = threading.Thread(target=calc)
t3 = threading.Thread(target=calc)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
delta = (datetime.datetime.now() - start).total_seconds() # 假并行,GIL锁
print(delta) #执行时间7.418743s
Queue的线程安全
标准库 queue
模块提供了 FIFO(先进先出)队列、LIFO(后进先出)队列以及优先队列等数据结构。Queue 类是线程安全的,适用于同一进程内多线程之间安全地交换数据。它内部使用了 Lock 和 Condition 来确保线程安全。
需要特别注意在多线程中使用 Queue 类时,例如以下代码:
import queue
q = queue.Queue(8)
if q.qsize() == 7:
q.put() # 上下两句可能被打断
if q.qsize() == 1:
q.get() # 未必会成功
如果不加锁,就无法确保获得准确的队列大小。这是因为在读取队列大小后,其他线程可能会修改队列,导致大小不再准确。因此,正确的做法是在进行 get
和 put
操作时加锁,以确保操作的原子性和线程安全。
即使 Queue
类的 size
方法加了锁,也不能保证立即执行 get
和 put
操作就一定成功。因为在执行这些操作时,可能会受到其他线程的干扰,导致操作无法立即完成。因此,在实际应用中,需要注意处理可能出现的竞态条件和线程安全问题。
多进程
由于 Python 的 GIL(全局解释器锁)存在,多线程未必是 CPU 密集型程序的好选择。多线程虽然可以实现并发执行,但由于 GIL 的限制,无法充分利用多核 CPU。
相比之下,多进程可以在完全独立的进程环境中运行程序,可以更充分地利用多处理器。但是进程之间的隔离带来了数据不共享的问题,而且进程的创建和销毁开销较大。
Python 的 multiprocessing
模块提供了多进程编程的支持,其中的 Process
类遵循了 Thread
类的 API,减少了学习难度。
由于 Python 的 GIL(全局解释器锁)存在,多线程未必是 CPU 密集型程序的好选择。多线程虽然可以实现并发执行,但由于 GIL 的限制,无法充分利用多核 CPU。
相比之下,多进程可以在完全独立的进程环境中运行程序,可以更充分地利用多处理器。但是进程之间的隔离带来了数据不共享的问题,而且进程的创建和销毁开销较大。
Python 的 `multiprocessing` 模块提供了多进程编程的支持,其中的 `Process` 类遵循了 `Thread` 类的 API,减少了学习难度。
对于上面这个程序,在同一主机(授课主机)上运行时长的对比如下:
- 使用单线程和多线程运行时长约为 4 分钟多。
- 使用多进程时长约为 1 分半。
可以观察到,多进程可以实现真正的并行,因为多个进程都在同时使用 CPU,而且进程库几乎没有什么学习难度。
需要注意的是,多进程的代码一定要放在 if __name__ == "__main__":
下面执行,以免在子进程中再次启动新的进程。
名称 | 说明 |
---|---|
pid |
进程id。 |
exitcode |
进程的退出状态码。 |
terminate() |
终止指定的进程。 |
进程间同步
Python 在进程间同步提供了和线程同步一样的类,使用的方法和效果也类似。不过,进程间同步的代价要高于线程间,因为涉及到进程的创建、销毁以及数据传输等开销,而且系统底层实现也不同。但是 Python 屏蔽了这些细节,让用户可以简单地使用多进程。
multiprocessing
模块还提供了共享内存和服务器进程来实现进程间数据的共享,同时也提供了用于进程间通信的 Queue
队列和 Pipe
管道等通信方式。
需要注意的是,多进程是启动多个解释器进程,因此进程间通信必须进行序列化和反序列化操作。此外,由于每个进程都有自己的 GIL,因此如果每个进程中没有实现多线程,全局解释器锁(GIL)在多进程中就失去了作用。
多进程、多线程的选择
CPU密集型任务:
在 CPython 中由于全局解释器锁(GIL)的存在,多线程时会存在锁的竞争,而且多核优势无法发挥,因此选择 Python 多进程效率更高。IO密集型任务:
在 Python 中适合使用多线程,可以减少多进程间 IO 的序列化开销。在 IO 等待的时候,可以切换到其他线程继续执行,从而提高效率。
应用场景
- 请求/应答模型:
这是 WEB 应用中常见的处理模型。主进程启动多个 worker 工作进程,通常数量和 CPU 核心数相同,以发挥多核优势。
在 worker 工作进程中,经常需要处理网络 IO 和磁盘 IO,可以启动多个线程来提高并发处理能力。
Worker 进程处理用户的请求时,往往需要等待数据,并在处理完请求后通过网络 IO 返回响应。这种工作模式类似于 Nginx。
concurrent.futures
模块
concurrent.futures
模块是在 Python 3.2 版本引入的,它提供了一个高级的异步可执行的便利接口,用于异步并行任务编程。
提供的执行器
ThreadPoolExecutor:
提供了异步调用的线程池的 Executor。它可以用于在单个主线程中并发执行多个任务,适用于 IO 密集型任务,可以避免由于 GIL 的存在导致的并发问题。ProcessPoolExecutor:
提供了异步调用的进程池的 Executor。它可以用于在多个进程中并发执行任务,每个任务在独立的进程中执行,适用于 CPU 密集型任务,可以充分利用多核处理器的优势。
ThreadPoolExecutor
对象
方法 | 含义 |
---|---|
ThreadPoolExecutor(max_workers=1) |
池中至多创建 max_workers 个线程的池来同时异步执行,返回 Executor 实例。支持上下文,进入时返回自己,退出时调用shutdown(wait=True) |
shutdown(wait=True) |
清理池,wait 表示是否等待到任务线程完成。 |
submit(fn, *args, **kwargs) |
提交执行的函数及其参数,如有空闲开启daemon线程,返回 Future 类的实例。 |
Future
类
方法 | 含义 |
---|---|
done() |
如果调用被成功的取消或者执行完成,返回True。 |
cancelled() |
如果调用被成功的取消,返回True。 |
running() |
如果正在运行且不能被取消,返回True。 |
cancel() |
尝试取消调用。如果已经执行且不能取消返回False,否则返回True。 |
result(timeout=None) |
取返回的结果,timeout 为None,一直等待返回;timeout 设置到期,抛出concurrent.futures.TimeoutError 异常。 |
exception(timeout=None) |
取返回的异常,timeout 为None,一直等待返回;timeout 设置到期,抛出concurrent.futures.TimeoutError 异常。 |
import threading
import datetime
import logging
import multiprocessing
import time
from concurrent.futures import ThreadPoolExecutor,wait
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
def calc(base):
s = base
for i in range(100000000):
s += 1
logging.info(s)
return s
# executor = ThreadPoolExecutor(max_workers=3) # 线程池, 3个线程, 预先开好三个泳道, 有任务来了,就直接跳进泳道,即资源有限,复用泳道
# future = executor.submit(calc, 100)
start = datetime.datetime.now()
executor = ThreadPoolExecutor(max_workers=3)
fs = []
for i in range(6):
future = executor.submit(calc, i*100)
fs.append(future)
# future.done() # 判断任务是否完成
# future.result() # 如果正在运行,会阻塞等待,直到任务完成
wait(fs) # 等待fs中的所有任务完成
delta = (datetime.datetime.now() - start).total_seconds()
print(delta) # 14.902915
# while True:
# time.sleep(1)
# print(threading.enumerate())
总结
该库统一了线程池、进程池调用,简化了编程;
是Python简单的思想哲学的体现;
缺点:无法设置线程名称。