Python 线程
进程
multiprocessing
run()和start()的区别:
- run()不会创建子进程执行任务,它只会使用当前进程执行。
- start()是创建一个子进程,然后执行run()方法。
全局变量不能被多进程共享:
import multiprocessing
import time
import os
# 全局变量不能被多进程共享
m = 0
def task1(*param):
global m
while True:
time.sleep(1)
print("Task 1")
print("current pid:", os.getpid())
print("parent pid:", os.getppid())
print("param:", *param)
m += 1
print("m:", m)
def task2():
global m
while True:
time.sleep(1)
print("Task 2")
m += 2
print("m:", m)
if __name__ == "__main__":
# args和kwargs是给函数传递的参数
t1 = multiprocessing.Process(target=task1, name="task_1", args=(1,3))
t2 = multiprocessing.Process(target=task2)
t1.start()
t2.start()
print("main:", os.getpid())
# 主进程打印子进程pid
print("t1:", t1.pid)
print("t2:", t2.pid)
print('-' * 30)自定义进程
import multiprocessing
import time
import os
class MyProcess(multiprocessing.Process):
# 重写run方法
def run(self):
n = 1
while True:
time.sleep(1)
print("进程%s,n = %d" % (self.name, n))
n = n + 1
# 执行5次后关闭
if n == 5:
self.terminate()
print(os.getpid())
if __name__ == "__main__":
print("parent", os.getpid())
t1 = MyProcess(name="t1")
t1.start()
t2 = MyProcess(name="t2")
t2.start()进程池
进程池包括阻塞式和非阻塞式。
非阻塞式:
import multiprocessing, time, random, os
def task(task_name):
print("Starting task %s:" % task_name)
start_timestamp = time.time()
time.sleep(random.random() * 2)
end_timestamp = time.time()
return task_name, end_timestamp - start_timestamp, os.getpid()
def callback(params):
task_name, consuming_time, pid = params
print(" %s Took time %fs. The pid is %s." % (task_name, consuming_time, pid))
if __name__ == "__main__":
# 创建5个进程
pool = multiprocessing.Pool(processes=5)
# 如果有进程完成任务变成空闲,会执行进程池中的剩余任务
for i in range(8):
# 非阻塞式
# func是任务函数,callback是任务完成的回调
pool.apply_async(func=task, kwds={"task_name": "Task %d" % (i + 1)}, callback=callback)
pool.close() # 关闭进程池,使其不添加新的任务
pool.join() # 主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用
print("Bye bye~")
"""
Outputs:
Starting task Task 1:
Starting task Task 2:
Starting task Task 3:
Starting task Task 4:
Starting task Task 5:
Starting task Task 6:
Task 1 Took time 0.116347s. The pid is 46471.
Task 4 Took time 0.260033s. The pid is 46470.
Starting task Task 7:
Starting task Task 8:
Task 5 Took time 0.529314s. The pid is 46473.
Task 7 Took time 0.591754s. The pid is 46470.
Task 6 Took time 1.115715s. The pid is 46471.
Task 8 Took time 0.707028s. The pid is 46473.
Task 2 Took time 1.733969s. The pid is 46472.
Task 3 Took time 1.979868s. The pid is 46474.
Bye bye~
"""阻塞式:
import multiprocessing, time, random, os
def task(task_name):
print("Starting task %s:" % task_name)
start_timestamp = time.time()
time.sleep(random.random() * 2)
end_timestamp = time.time()
print(" %s Took time %fs. The pid is %s." % (task_name, end_timestamp - start_timestamp, os.getpid()))
if __name__ == "__main__":
# 创建5个进程
pool = multiprocessing.Pool(processes=5)
# 如果有进程完成任务变成空闲,会执行进程池中的剩余任务
for i in range(8):
# 阻塞式没有callback
pool.apply(func=task, kwds={"task_name": "Task %d" % (i + 1)})
pool.close() # 关闭进程池,使其不添加新的任务
pool.join() # 主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用
print("Bye bye~")
"""
Outputs:
Starting task Task 1:
Task 1 Took time 1.793565s. The pid is 46434.
Starting task Task 2:
Task 2 Took time 1.371809s. The pid is 46435.
Starting task Task 3:
Task 3 Took time 0.684377s. The pid is 46438.
Starting task Task 4:
Task 4 Took time 1.410749s. The pid is 46436.
Starting task Task 5:
Task 5 Took time 1.717518s. The pid is 46437.
Starting task Task 6:
Task 6 Took time 1.030143s. The pid is 46434.
Starting task Task 7:
Task 7 Took time 0.700300s. The pid is 46435.
Starting task Task 8:
Task 8 Took time 1.696361s. The pid is 46438.
Bye bye~
"""进程间通信
进程本身不能通信,但是可以利用数据结构共享数据。
queue库示例:
import queue
q = queue.Queue(maxsize=5)
print(q.empty()) # True
q.put("t1")
q.put("t2")
q.put("t3")
q.put("t4")
q.put("t5")
print(q.full()) # True
# 由于队列满了,此时进程被阻塞,队列等待空位以添加新项
# get()也有同样的性质:如果队列为空,进程会被阻塞
# get()和put()都有timeout参数
q.put("t6") 通过multiprocessing.Queue。略
线程
基本使用
import threading, time
def download(n):
imgs = ['cat', 'dog', 'flower']
for i in imgs:
print("Downloading %s%d..." % (i, n))
time.sleep(1)
print(" %s%d Downloaded." % (i, n))
if __name__ == '__main__':
t = threading.Thread(target=download, args=(1,))
t2 = threading.Thread(target=download, args=(2,))
t.start()
t2.start()全局变量可以被多线程共享
import threading
import time
import os
m = 0
def task1():
global m
while True:
time.sleep(1)
m += 1
print("Task 1", "m:", m)
def task2():
global m
while True:
time.sleep(1)
m += 2
print("Task 2", "m:", m)
if __name__ == "__main__":
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()看下面的情况:
import threading
import time
import os
m = 0
def task1():
global m
for i in range(1000000):
m += 1
print("Task 1")
print(" m:", m)
def task2():
global m
for i in range(1000000):
m += 1
print("Task 2")
print(" m:", m)
if __name__ == "__main__":
t1 = threading.Thread(target=task1, name="task_1")
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
"""
Outputs:
Task 2
m: 1234926
Task 1
m: 1352098
"""为什么线程可以共享数据,但是这里的数据是错误的呢?
因为:
如果多线程要共享数据,则必须要线程同步,才能使数据不会在资源抢占时发生不同步。实现线程同步、数据安全,就是给线程加锁。
Python默认规则是,只要用线程,默认就加锁(GIL, Global Interpreter Lock),所以可以实现线程同步。而且如果数据很大时,锁又回失效。这是CPython解释器的缺点。(可以使用numpy这样的第三方库做计算。)
加锁使得python的threading的多线程其实是伪多线程。
总结:耗时操作(比如爬虫、IO)使用多线程,计算密集型使用多进程(因为进程直接与CPU打交道)。
线程锁的使用
import threading
m = 0
lock = threading.Lock()
def task1():
global m
for i in range(1000000):
lock.acquire()
m += 1
lock.release()
print("Task 1", "m:", m)
def task2():
global m
for i in range(1000000):
lock.acquire()
m += 1
lock.release()
print("Task 2", "m:", m)
if __name__ == "__main__":
# args和kwargs是给函数传递的参数
t1 = threading.Thread(target=task1, name="task_1")
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
"""
Outputs:
Task 2 m: 1745951
Task 1 m: 2000000
"""死锁
示例:
from threading import Thread, Lock
import time
lock1 = Lock()
lock2 = Lock()
class MyThread1(Thread):
def run(self):
lock1.acquire()
print("Thread 1 got lock 1.")
time.sleep(1)
lock2.acquire()
print("Thread 1 got lock 2.")
lock2.release()
lock1.release()
class MyThread2(Thread):
def run(self):
lock2.acquire()
print("Thread 2 got lock 2.")
time.sleep(1)
lock1.acquire()
print("Thread 2 got lock 1.")
lock1.release()
lock2.release()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()解决方式是:给acquire()添加timeout参数
生产者和消费者
from queue import Queue
from threading import Thread
q = Queue()
def produce():
for i in range(10):
q.put(i)
print('生产任务完毕!')
"""
不写join,该函数立刻结束;
写了join,所有任务都task_done时,join才会取消阻塞
"""
q.join()
print(produce.__name__, '函数结束!')
def consumer():
for i in range(10):
print('消费:', q.get())
q.task_done() # 每次get后都要调用task_done()
print(consumer.__name__, '函数结束!')
pro = Thread(target=produce)
con = Thread(target=consumer)
pro.start()
con.start()
con.join()
pro.join()
print('主进程结束')
"""Outputs:
生产任务完毕!
消费: 0
消费: 1
消费: 2
消费: 3
消费: 4
消费: 5
消费: 6
消费: 7
消费: 8
消费: 9
consumer 函数结束!
produce 函数结束!
主进程结束
"""协程
coroutine。比线程更小。
在Python中可以使用生成器来实现协程。相关库有greenlet、gevent。
# https://www.liaoxuefeng.com/wiki/897692888725344/923057403198272
import time
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
next(c)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__ == '__main__':
c = consumer()
produce(c)