即使一个人,也要活得像军队一样!

Python-- 进程、线程、协程

引子

假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源。是不是在程序A读取数据的过程中,让程序B去执行,当程序A读取完数据之后,让程序B暂停,然后让程序A继续执行?当然没问题,但这里有一个关键词:切换。既然是切换,那么这就涉及到了状态的保存,状态的恢复,加上程序A与程序B所需要的系统资源(内存,硬盘,键盘等等)是不一样的。自然而然的就需要有一个东西去记录程序A和程序B分别需要什么资源,怎样去识别程序A和程序B等等,所以就有了一个叫进程的抽象

进程定义

进程就是一个程序在一个数据集上的一次动态执行过程。
进程一般由程序、数据集、进程控制块三部分组成。
我们编写的程序用来描述进程要完成哪些功能以及如何完成;
数据集则是程序在执行过程中所需要使用的资源;
进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

线程

线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,使到进程内并发成为可能。

举例:
假设,一个文本程序,需要接受键盘输入,将内容显示在屏幕上,还需要保存信息到硬盘中。若只有 一个进程,势必造成同一时间只能干一样事的尴尬(当保存时,就不能通过键盘输入内容)。若有多个进程,每个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的任务,进程C负责保存内容到硬盘中的任务。这里进程A,B,C间的协作涉及到了进程通信问题,而且有共同都需要拥有的东西——-文本内容,不停的切换造成性能上的损失。
若有一种机制,可以使任务A,B,C共享资源,这样上下文切换所需要保存和恢复的内容就少了,同时又可以减少通信所带来的性能损耗,那就好了。是的,这种机制就是线程。

线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。线程没有自己的系统资源。

线程进程的关系区别

1 一个程序至少有一个进程,一个进程至少有一个线程.(进程可以理解成线程的容器)

2 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。

3 线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口
但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。

4 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调
度的一个独立单位.
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程
自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是
它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.

举一个不是很形象的例子:

1
2
3
4
5
6
7
8
9
10
11
银行有4个窗口, 每个窗口都可以存钱和取钱。4个窗口,也就限制了。同一时段,最多只能有4个客户在窗口进行存钱或取钱的操作。

窗口1 窗口2 窗口3 窗口4
存 取 存 取

假如,存钱和取钱都是需要时间的。A准备存1个亿的现金,在A准备的过程中(A点钞或者专门有点钞的人员)B要取钱。此时B就可以在柜台取钱。而不耽误B的时间。

窗 口 1 窗 口 2 窗 口 3 窗 口 4
存 | 取 存 | 取 存 | 取 存 | 取

现在也就是说,同一时段,理想状态最多有8个客户在窗口进行存钱或取钱。

何谓进程,何谓线程,我想你应该能够理清楚了。

Python 的 GIL

1
2
3
# In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

# 上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行

Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。
GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作

python的线程与threading模块

线程的两种调用方式

threading 模块建立在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块通过对thread进行二次封装,提供了更方便的api来处理线程。
直接调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import time


def sayhi(num): # 定义每个线程要运行的函数

print("running on number:%s" % num, "time-->", time.time())
time.sleep(num)
print("stop on number:%s" % num, "time-->", time.time())


if __name__ == '__main__':
t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一个线程实例
t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另一个线程实例
t1.start() # 启动线程
t2.start() # 启动另一个线程
print("now time-->", time.time())
print('线程1--》》', t1.getName()) # 获取线程名
print('线程2--》》', t2.getName())

# output:
# running on number:1 time--> 1577152259.8146904
# running on number:2 time--> 1577152259.8146904
# now time--> 1577152259.8146904
# 线程1--》》 Thread-1
# 线程2--》》 Thread-2
# stop on number:1 time--> 1577152260.815012
# stop on number:2 time--> 1577152261.815092

继承调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import time


class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num

def run(self): # 定义每个线程要运行的函数
print("running on number:%s" % self.num, "time-->", time.time())
time.sleep(self.num)
print("stop on number:%s" % self.num, "time-->", time.time())


if __name__ == '__main__':

t1 = MyThread(1) # 生成一个线程实例
t2 = MyThread(2) # 生成另一个线程实例
t1.start() # 启动线程
t2.start() # 启动另一个线程

print("now time-->", time.time())
print('线程1--》》', t1.getName()) # 获取线程名
print('线程2--》》', t2.getName())

# output:
# running on number:1 time--> 1577155042.4301505
# running on number:2 time--> 1577155042.4301505
# now time--> 1577155042.4301505
# 线程1--》》 Thread-1
# 线程2--》》 Thread-2
# stop on number:1 time--> 1577155043.4311821
# stop on number:2 time--> 1577155044.4308763

threading.thread的实例方法

join()和setDaemon()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。

# setDaemon(True):
'''
将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。

当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成

想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是只要主线程

完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦'''


import threading
from time import ctime, sleep
import time


def music(name):
print("Begin listening to {name}. {time}".format(name=name, time=ctime()))
sleep(3)
print("end listening {time}".format(time=ctime()))


def blog(title):
print("Begin recording the {title}. {time}".format(title=title, time=ctime()))
sleep(5)
print('end recording {time}'.format(time=ctime()))


threads = []
t1 = threading.Thread(target=music, args=('过火',)) # 生成线程实例
t2 = threading.Thread(target=blog, args=('博客园',))
threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
for t in threads:
t.start()
print("all over %s" % ctime())

# output:
# Begin listening to 过火. Tue Dec 24 10:49:41 2019
# Begin recording the 博客园. Tue Dec 24 10:49:41 2019
# all over Tue Dec 24 10:49:41 2019
# end listening Tue Dec 24 10:49:44 2019
# end recording Tue Dec 24 10:49:46 2019
1
2
3
4
5
6
7
8
9
10
11
12
13
14
if __name__ == '__main__':
for t in threads:
t.start()
t.join() # 等待执行完毕
print("all over %s" % ctime())

# output:
# Begin listening to 过火. Tue Dec 24 10:52:43 2019
# end listening Tue Dec 24 10:52:46 2019
# Begin recording the 博客园. Tue Dec 24 10:52:46 2019
# end recording Tue Dec 24 10:52:51 2019
# all over Tue Dec 24 10:52:51 2019

# 先听歌、再看博客、再结束
1
2
3
4
5
6
7
8
9
10
11
12
13
if __name__ == '__main__':
for t in threads:
t.start()

t1.join() # 等待执行完毕
print("all over %s" % ctime())

# output:
# Begin listening to 过火. Tue Dec 24 11:01:21 2019
# Begin recording the 博客园. Tue Dec 24 11:01:21 2019
# end listening Tue Dec 24 11:01:24 2019
# all over Tue Dec 24 11:01:24 2019
# end recording Tue Dec 24 11:01:26 2019
1
2
3
4
5
6
7
8
9
10
11
12
13
if __name__ == '__main__':
for t in threads:
t.start()

t2.join() # 等待执行完毕
print("all over %s" % ctime())

# output:
# Begin listening to 过火. Tue Dec 24 11:03:01 2019
# Begin recording the 博客园. Tue Dec 24 11:03:01 2019
# end listening Tue Dec 24 11:03:04 2019
# end recording Tue Dec 24 11:03:06 2019
# all over Tue Dec 24 11:03:06 2019
1
2
3
4
5
6
7
8
9
10
if __name__ == '__main__':
for t in threads:
t.setDaemon(True) # 注意:一定在start之前设置
t.start()
print("all over %s" % ctime())

# output:
# Begin listening to 过火. Tue Dec 24 11:03:52 2019
# Begin recording the 博客园. Tue Dec 24 11:03:52 2019
# all over Tue Dec 24 11:03:52 2019
1
2
3
4
5
6
7
8
9
10
11
12
if __name__ == '__main__':
t1.setDaemon(True) # 注意:一定在start之前设置
for t in threads:
t.start()
print("all over %s" % ctime())

# output:
# Begin listening to 过火. Tue Dec 24 11:05:57 2019
# Begin recording the 博客园. Tue Dec 24 11:05:57 2019
# all over Tue Dec 24 11:05:57 2019
# end listening Tue Dec 24 11:06:00 2019
# end recording Tue Dec 24 11:06:02 2019
1
2
3
4
5
6
7
8
9
10
11
if __name__ == '__main__':
t2.setDaemon(True) # 注意:一定在start之前设置
for t in threads:
t.start()
print("all over %s" % ctime())

# output:
# Begin listening to 过火. Tue Dec 24 11:05:25 2019
# Begin recording the 博客园. Tue Dec 24 11:05:25 2019
# all over Tue Dec 24 11:05:25 2019
# end listening Tue Dec 24 11:05:28 2019

其它方法

1
2
3
4
5
6
7
8
9
10
# run():  线程被cpu调度后自动执行线程对象的run方法
# start():启动线程活动。
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。

# threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

同步锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
import threading


def add_num():
global num # 在每个线程中都获取这个全局变量
temp = num
# print('--get num:', num)
time.sleep(0.1)
num = temp-1 # 对此公共变量进行-1操作


num = 100 # 设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=add_num)
t.start()
thread_list.append(t)

for t in thread_list: # 等待所有线程执行完毕
t.join()

print('final num:', num)

# output:
# final num: 99

观察:time.sleep(0.1) /0.001/0.0000001 结果分别是多少?

多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?(join会造成串行,失去所线程的意义)

我们可以通过同步锁来解决这种问题.

1
2
3
4
5
6
7
8
9
R=threading.Lock()

def sub():
global num
R.acquire()
temp=num-1
time.sleep(0.1)
num=temp
R.release()

锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:

1
2
3
4
5
6
7
8
9
import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操作
'''
R.release()

思考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
'''
1、为什么有了GIL,还需要线程同步?

多线程环境下必须存在资源的竞争,那么如何才能保证同一时刻只有一个线程对共享资源进行存取?

加锁, 对, 加锁可以保证存取操作的唯一性, 从而保证同一时刻只有一个线程对共享数据存取.

通常加锁也有2种不同的粒度的锁:

coarse-grained(粗粒度): python解释器层面维护着一个全局的锁机制,用来保证线程安全。
内核级通过GIL实现的互斥保护了内核的共享资源。

fine-grained(细粒度): 那么程序员需要自行地加,解锁来保证线程安全,
用户级通过自行加锁保护的用户程序的共享资源。

2、GIL为什么限定在一个进程上?

你写一个py程序,运行起来本身就是一个进程,这个进程是有解释器来翻译的,所以GIL限定在当前进程;
如果又创建了一个子进程,那么两个进程是完全独立的,这个字进程也是有python解释器来运行的,所以
这个子进程上也是受GIL影响的


'''

线程死锁和递归锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。下面是一个死锁的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()


class MyThread(threading.Thread):

def __init__(self):
threading.Thread.__init__(self)

def run(self):
self.fun1()
self.fun2()

def fun1(self):
mutexA.acquire() # 如果锁被占用,则阻塞在这里,等待锁的释放
print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutexB.acquire()
print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
mutexB.release()
mutexA.release()

def fun2(self):
mutexB.acquire()
print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
time.sleep(0.2)

mutexA.acquire()
print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutexA.release()
mutexB.release()


if __name__ == "__main__":

print("start---------------------------%s"%time.time())
for i in range(0, 10):
my_thread = MyThread()
my_thread.start()

在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

1
2
mutex = threading.RLock()
mutexA = mutexB = mutex

使用场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import threading


class Account:
def __init__(self, _id, balance):
self.id = _id
self.balance = balance
self.lock = threading.RLock()

def withdraw(self, amount):
with self.lock:
self.balance -= amount

def deposit(self, amount):
with self.lock:
self.balance += amount

def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的场景
with self.lock:
interest = 0.05
count = amount+amount*interest
self.withdraw(count)


def transfer(_from, to, amount):
# 锁不可以加在这里 因为其他的其它线程执行的其它方法在不加锁的情况下数据同样是不安全的
_from.withdraw(amount)
to.deposit(amount)


alex = Account('alex', 1000)
yuan = Account('yuan', 1000)

t1=threading.Thread(target=transfer, args=(alex, yuan, 100))
t1.start()

t2=threading.Thread(target = transfer, args = (yuan,alex, 200))
t2.start()

t1.join()
t2.join()

print('>>>',alex.balance)
print('>>>',yuan.balance)

# output:
# >>> 1100
# >>> 900

Event对象

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

1
2
3
4
5
6
7
# event.isSet():返回event的状态值;

# event.wait():如果 event.isSet()==False将阻塞线程;

# event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

# event.clear():恢复event的状态值为False。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import threading, time


class Boss(threading.Thread):
def run(self):
print("BOSS:今晚大家都要加班到22:00。")
print(event.isSet())
event.set()
time.sleep(5)
print("BOSS:<22:00>可以下班了。")
print(event.isSet())
event.set()


class Worker(threading.Thread):
def run(self):
event.wait()
print("Worker:哎……命苦啊!")
time.sleep(1)
event.clear()
event.wait()
print("Worker:OhYeah!")


if __name__ == "__main__":
event = threading.Event()
threads = []
for i in range(5):
threads.append(Worker())
threads.append(Boss())
for t in threads:
t.start()
for t in threads:
t.join()

# output:
# BOSS:今晚大家都要加班到22:00。
# False
# Worker:哎……命苦啊!
# Worker:哎……命苦啊!
# Worker:哎……命苦啊!
# Worker:哎……命苦啊!
# Worker:哎……命苦啊!
# BOSS:<22:00>可以下班了。
# False
# Worker:OhYeah!
# Worker:OhYeah!
# Worker:OhYeah!
# Worker:OhYeah!
# Worker:OhYeah!

threading.Event的wait方法还接受一个超时参数,默认情况下如果事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数之后,如果阻塞时间超过这个参数设定的值之后,wait方法会返回。

Semaphore(信号量)

信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time

semaphore = threading.Semaphore(5)

def func():
if semaphore.acquire():
print (threading.currentThread().getName() + ' get semaphore')
time.sleep(2)
semaphore.release()

for i in range(20):
t1 = threading.Thread(target=func)
t1.start()

应用:连接池

思考:与Rlock的区别?

生产者消费者模型

为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
count = 0
while count <10:
print("making........")
time.sleep(random.randrange(3))
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
#q.task_done()
#q.join()
print("ok......")
def Consumer(name):
count = 0
while count <10:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
#q.task_done()
#q.join()
print(data)
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
else:
print("-----no baozi anymore----")
count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()

多进程模块 multiprocessing

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

python的进程调用

直接调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process
import time


def f(name):
print('hello', name, time.ctime())
time.sleep(1)


if __name__ == '__main__':
p_list = []
for i in range(3):
p = Process(target=f, args=('alvin:%s' % i,))
p_list.append(p)
p.start()
for p in p_list:
p.join()
print('end')

# output:
# hello alvin:0 Tue Dec 24 13:58:18 2019
# hello alvin:1 Tue Dec 24 13:58:18 2019
# hello alvin:2 Tue Dec 24 13:58:18 2019
# end

继承调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Process
import time


class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
# self.name = name

def run(self):
print('hello', self.name, time.ctime())
time.sleep(1)


if __name__ == '__main__':
p_list = []
for i in range(3):
p = MyProcess()
p_list.append(p)
p.start()
for p in p_list:
p.join()
print('end')

# output:
# hello MyProcess-1 Tue Dec 24 14:01:52 2019
# hello MyProcess-2 Tue Dec 24 14:01:52 2019
# hello MyProcess-3 Tue Dec 24 14:01:52 2019
# end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 构造方法:

# Process([group [, target [, name [, args [, kwargs]]]]])

#   group: 线程组,目前还没有实现,库引用中提示必须是None;
#   target: 要执行的方法;
#   name: 进程名;
#   args/kwargs: 要传入方法的参数。

# 实例方法:

#   is_alive():返回进程是否在运行。
#   join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
#   start():进程准备就绪,等待CPU调度
#   run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
#   terminate():不管任务是否完成,立即停止工作进程
# 属性:
#   daemon:和线程的setDeamon功能一样
#   name:进程名字。
#   pid:进程号。

进程间通讯

进程队列Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process, Queue


def f(q, n):
q.put(n*n+1)
print("son process", id(q))


if __name__ == '__main__':
q = Queue()
print("main process", id(q))

for i in range(3):
p = Process(target=f, args=(q, i))
p.start()

print(q.get())
print(q.get())
print(q.get())

# output:
# main process 1774500620384
# son process 1894343054952
# 1
# son process 1229893801576
# 5
# son process 1879186937336
# 2

管道

Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()和recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process, Pipe


def f(conn):
conn.send([12, {"name": "yuan"}, 'hello'])
response = conn.recv()
print("response", response)
conn.close()


if __name__ == '__main__':

parent_conn, child_conn = Pipe()
print("p_ID:", id(parent_conn), "q_ID1:", id(child_conn))
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
parent_conn.send("儿子你好!")
p.join()

# output:
# p_ID: 1559578636640 q_ID1: 1559578637368
# [12, {'name': 'yuan'}, 'hello']
# response 儿子你好!

manager

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from multiprocessing import Process, Manager


def f(d, l, n):
d[n] = n
d["name"] ="alvin"
l.append(n)

print("进程:", n, "列表L:",l)


if __name__ == '__main__':

with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p_list = []

for i in range(10):
p = Process(target=f, args=(d,l,i))
p.start()
p_list.append(p)

for res in p_list:
res.join()

print(d)
print(l)


# output:
# 进程: 2 列表L: [0, 1, 2, 3, 4, 2]
# 进程: 5 列表L: [0, 1, 2, 3, 4, 2, 5]
# 进程: 7 列表L: [0, 1, 2, 3, 4, 2, 5, 7]
# 进程: 3 列表L: [0, 1, 2, 3, 4, 2, 5, 7, 3]
# 进程: 9 列表L: [0, 1, 2, 3, 4, 2, 5, 7, 3, 9]
# 进程: 6 列表L: [0, 1, 2, 3, 4, 2, 5, 7, 3, 9, 6]
# 进程: 4 列表L: [0, 1, 2, 3, 4, 2, 5, 7, 3, 9, 6, 4]
# 进程: 8 列表L: [0, 1, 2, 3, 4, 2, 5, 7, 3, 9, 6, 4, 8]
# 进程: 0 列表L: [0, 1, 2, 3, 4, 2, 5, 7, 3, 9, 6, 4, 8, 0]
# 进程: 1 列表L: [0, 1, 2, 3, 4, 2, 5, 7, 3, 9, 6, 4, 8, 0, 1]
# {2: 2, 'name': 'alvin', 5: 5, 7: 7, 3: 3, 9: 9, 6: 6, 4: 4, 8: 8, 0: 0, 1: 1}
# [0, 1, 2, 3, 4, 2, 5, 7, 3, 9, 6, 4, 8, 0, 1]

进程同步

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Lock

def f(l, i):

with l.acquire():
print('hello world %s'%i)

if __name__ == '__main__':
lock = Lock()

for num in range(10):
Process(target=f, args=(lock, num)).start()

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Pool
import time


def foo(args):
time.sleep(1)
print(args)


if __name__ == '__main__':
p = Pool(5)
for i in range(10):
p.apply_async(func=foo, args=(i,))

p.close() # 等子进程执行完毕后关闭线程池
# time.sleep(2)
# p.terminate() # 立刻关闭线程池
p.join()

进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有以下几个主要方法:

apply:从进程池里取一个进程并执行
apply_async:apply的异步版本
terminate:立刻关闭线程池
join:主进程等待所有子进程执行完毕,必须在close或terminate之后
close:等待所有进程结束后,才关闭线程池

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

yield与协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import time

"""
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
"""
# 注意到consumer函数是一个generator(生成器):
# 任何包含yield关键字的函数都会自动成为生成器(generator)对象

def consumer():
r = ''
while True:
# 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
# yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
# 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
# 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
n = yield r
if not n:
return
print('[CONSUMER] ←← Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
# 1、首先调用c.next()启动生成器
next(c)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] →→ Producing %s...' % n)
# 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
cr = c.send(n)
# 4、produce拿到consumer处理的结果,继续生产下一条消息;
print('[PRODUCER] Consumer return: %s' % cr)
# 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
c.close()
if __name__=='__main__':
# 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
c = consumer()
produce(c)

'''
result:

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''

greenlet

greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的”Coroutine(协程)”的一个基础库.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from greenlet import greenlet

def test1():
print (12)
gr2.switch()
print (34)
gr2.switch()

def test2():
print (56)
gr1.switch()
print (78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

# output:
# 12
# 56
# 34
# 78

基于greenlet的框架–gevent模块实现协程

Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import gevent
import time


def foo():
print("running in foo")
gevent.sleep(2)
print("switch to foo again")


def bar():
print("switch to bar")
gevent.sleep(5)
print("switch to bar again")


start = time.time()

gevent.joinall(
[gevent.spawn(foo), gevent.spawn(bar)]
)

print(time.time()-start)

# output:
# running in foo
# switch to bar
# switch to foo again
# switch to bar again
# 5.0048134326934814

当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time


def f(url):
print('GET: %s' % url)
start_time = time.time()
resp = request.urlopen(url)
data = resp.read()
print('%d bytes received from %s. time: %s' % (len(data), url, time.time() - start_time))


start = time.time()

gevent.joinall([
gevent.spawn(f, 'https://itk.org/'),
gevent.spawn(f, 'https://www.baidu.com/'),
gevent.spawn(f, 'https://zhihu.com/'),
])

print('-----')
f('https://itk.org/')
f('https://www.baidu.com/')
f('https://zhihu.com/')

print(time.time()-start)


# output:
# GET: https://itk.org/
# GET: https://www.baidu.com/
# GET: https://zhihu.com/
# 227 bytes received from https://www.baidu.com/. time: 0.14103436470031738
# 46368 bytes received from https://zhihu.com/. time: 0.7752258777618408
# 27383 bytes received from https://itk.org/. time: 3.584683895111084
# -----
# GET: https://itk.org/
# 27383 bytes received from https://itk.org/. time: 2.513073682785034
# GET: https://www.baidu.com/
# 227 bytes received from https://www.baidu.com/. time: 0.13568687438964844
# GET: https://zhihu.com/
# 46368 bytes received from https://zhihu.com/. time: 0.815751314163208
# 7.051190376281738

协程的好处

无需线程上下文切换的开销
无需原子操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序.

– end –

-------------本文结束感谢您的阅读-------------