进程简介
进程是程序在计算机上的一次执行活动。当你运行一个程序,你就启动了一个进程。显然,程序是死的(静态的),进程是活的(动态的)。进程可以分为系统进程和用户进程。凡是用于完成操作系统的各种功能的进程就是系统进程,它们就是处于运行状态下的操作系统本身;用户进程就不必我多讲了吧,所有由你启动的进程都是用户进程。进程是操作系统进行资源分配的单位。
在操作系统的管理下,所有正在运行的进程轮流使用CPU,每个进程允许占用CPU的时间非常短(比如10毫秒),这样用户根本感觉不出来CPU是在轮流为多个进程服务,就好象所有的进程都在不间断地运行一样。但实际上在任何一个时间内有且仅有一个进程占有CPU。
进程和线程
进程(process)和线程(thread)
单个CPU一次只能运行一个任务;在任一时刻,CPU总是运行一个进程,其他进程处于非运行状态;
一个进程可以包含多个线程;
进程没有任何共享状态,进程修改的数据,改动仅限于该进程内;
一个进程的内存空间是共享的,每个线程都可以使用这些共享内存;
一个线程使用某些共享内存时,其他线程必须等它结束才能使用这一块内存;防止多个线程同时读写某一块内存区域,采用互斥锁(Mutual exclusion,缩写Mutex);
某些内存区域只能供给固定数目的线程使用,此时通过信号量(Semaphore)保证多个线程不会互相冲突;
多进程形式,运行多个任务同时运行;多线程形式,允许单个任务分成不同的部分运行;
多线程使用的是cpu的一个核,适合io密集型;
多进程使用的是cpu的多个核,适合运算密集型。
在linux中可以使用ps -efL查看进程和线程ID。以memcached进程为例,输出结果如下1
2
3
4
5
6
7
8
9
10
11
12[root@VM_0_4_centos ~]# ps -efL |grep memcached
root 24421 1 24421 0 10 May19 ? 00:00:03 memcached -d -u root
root 24421 1 24422 0 10 May19 ? 00:00:01 memcached -d -u root
root 24421 1 24423 0 10 May19 ? 00:00:00 memcached -d -u root
root 24421 1 24424 0 10 May19 ? 00:00:00 memcached -d -u root
root 24421 1 24425 0 10 May19 ? 00:00:00 memcached -d -u root
root 24421 1 24426 0 10 May19 ? 00:00:00 memcached -d -u root
root 24421 1 24427 0 10 May19 ? 00:00:00 memcached -d -u root
root 24421 1 24428 0 10 May19 ? 00:00:00 memcached -d -u root
root 24421 1 24429 0 10 May19 ? 00:00:09 memcached -d -u root
root 24421 1 24430 0 10 May19 ? 00:00:00 memcached -d -u root
root 32169 31101 32169 0 1 23:23 pts/0 00:00:00 grep --color=auto memcached
第一行UID
(用户ID),第二行为PID
(进程ID),第三行PPID
(父进程ID),第四行LWP
(线程ID)。
从示例可以看出,进程24421子进程有10个,对应线程ID分别为24421-24430。
multiprocess
python中的多线程无法利用多核优势,若要充分使用多核CPU资源,在python中大部分情况使用多进程。python提供了非常好用的多进程包multiprocessing。
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
Process类
创建一个Process对象1
p = multiprocessing.Process(target=worker_1, args=(2, ))
参数
target:函数名字
args:函数需要的参数,以tuple的形式传入(单个元素的tuple必须有逗号)方法
p.is_alive() 判断进程p是否存活,是返回True
p.run() 启动进程,它去调用target指定的函数
p.start() 启动进程,它会自动调用run方法,推荐使用start
p.join(timeout) 主线程等待p终止(主线程处于等的状态,p处于运行状态)。p.join只能join使用start开启的进程,不能join使用run开启的进程
p.terminate() 强制进程p退出,不会进行任何清理操作,如果p创建了子进程,该子进程就变成了僵尸进程- 属性
p.name 进程的名字
p.pid 进程的pid
p.daemon 默认为False,如果设置为True代表p为后台运行的守护进程,当p的父进程终止时p也随之终止,并且设置为True后,p不能创建自己的新进程,必须在p.start()之前设置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import multiprocessing
import time
def worker(args, interval):
print("start worker {0}".format(args))
time.sleep(interval)
print("end worker {0}".format(args))
def main():
print("start main")
p1 = multiprocessing.Process(target=worker, args=(1, 1))
p2 = multiprocessing.Process(target=worker, args=(2, 2))
p3 = multiprocessing.Process(target=worker, args=(3, 3))
p1.start()
p2.start()
p3.start()
print("end main")
if __name__ == '__main__':
main()
输出结果1
2
3
4
5
6
7
8start main
end main
start worker 1
start worker 2
start worker 3
end worker 1
end worker 2
end worker 3
multprocessing用到的两个方法
cpu_count():统计cpu总数
active_children():获得所有子进程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import multiprocessing
import time
def worker(args, interval):
print("start worker {0}".format(args))
time.sleep(interval)
print("end worker {0}".format(args))
def main():
print("start main")
p1 = multiprocessing.Process(target=worker, args=(1, 1))
p2 = multiprocessing.Process(target=worker, args=(2, 2))
p3 = multiprocessing.Process(target=worker, args=(3, 3))
p1.start()
p1.join(timeout=0.5) #此处保证了p1优先执行
p2.start()
p3.start()
print("the number of CPU is: {0}".format(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("The name of active children is: {0}, pid is: {1} is alive".format(p.name, p.pid))
print("end main")
if __name__ == '__main__':
main()
输出结果1
2
3
4
5
6
7
8
9
10
11
12start main
start worker 1
the number of CPU is: 4
The name of active children is: Process-1, pid is: 25360 is alive
The name of active children is: Process-2, pid is: 24500 is alive
The name of active children is: Process-3, pid is: 26100 is alive
end main
start worker 3
start worker 2
end worker 1
end worker 2
end worker 3
lock组件
当我们用多进程来读写文件的时候,如果一个进程是写文件,一个进程是读文件,如果两个文件同时进行,肯定是不行的,必须是文件写结束以后,才可以进行读操作。或者是多个进程在共享一些资源的时候,同时只能有一个进程进行访问,那就要有一个锁机制进行控制。
下面使用2个进程分别进行+1
和+3
操作为例
- 不加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import time
import multiprocessing
def add(value, number):
print("start add{0} number= {1}".format(value, number))
for i in range(1, 3):
number += value
time.sleep(0.3)
print("number = {0}".format(number))
if __name__ == '__main__':
print("start main")
number = 0
p1 = multiprocessing.Process(target=add, args=(1, number))
p3 = multiprocessing.Process(target=add, args=(3, number))
p1.start()
p3.start()
print("end main")
输出结果1
2
3
4
5
6
7
8start main
end main
start add1 number= 0
start add3 number= 0
number = 1
number = 3
number = 2
number = 6
- 加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import time
import multiprocessing
def add(lock, value, number):
with lock:
print("start add{0} number= {1}".format(value, number))
for i in range(1, 3):
number += value
time.sleep(0.3)
print("number = {0}".format(number))
if __name__ == '__main__':
print("start main")
number = 0
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=add, args=(lock, 1, number))
p3 = multiprocessing.Process(target=add, args=(lock, 3, number))
p1.start()
p3.start()
print("end main")
输出结果1
2
3
4
5
6
7
8start main
end main
start add1 number= 0
number = 1
number = 2
start add3 number= 0
number = 3
number = 6
锁的获取可以使用lock.acquire()
获取,lock.release()
释放1
2
3
4
5
6
7
8
9
10
11
12
13def add(lock, value, number):
lock.acquire()
print("start add3 number= {0}".format(number))
try:
for i in range(1, 5):
number += value
time.sleep(0.3)
print("number = {0}".format(number))
except Exception as e:
raise e
finally:
lock.release()
pass
共享内存
一般变量在进程之间是没法进行通讯的,但是multiprocessing提供了Value
和Array
模块,可以在不同的进程中使用同一变量。Value
和Array
结构内部都实现了锁机制,因此多进程是安全的。
Value和Array都需要设置其中存放值的类型,d是double类型,i是int类型。类型设置和array
模块的值类似,更多的类型可以点击array — Efficient arrays of numeric values查看。
上面的示例中,两个进程执行后number结果分别为2和6,假如两个进程可以共享变量,name输出结果将会是8。
- Value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import multiprocessing
from multiprocessing import Value
def add(value, number):
print("start add{0} number= {1}".format(value, number.value))
for i in range(1, 3):
number.value += value
print("number = {0}".format(number.value))
if __name__ == '__main__':
print("start main")
number = Value('d', 0) #使用Value创建变量,d表示数据类型为double
p1 = multiprocessing.Process(target=add, args=(1, number))
p3 = multiprocessing.Process(target=add, args=(3, number))
p1.start()
p3.start()
print("end main")
输出结果1
2
3
4
5
6
7
8start main
end main
start add1 number= 0.0
start add3 number= 0.0
number = 1.0
number = 4.0
number = 5.0
number = 8.0
number最终结果是8,但是具体输出结果每次执行可能存在差异。
- Array
1
2
3
4
5
6
7
8
9
10
11
12
13from multiprocessing import Array, Process
def worker(arr):
for i in range(len(arr)):
arr[i] = -arr[i]
if __name__ == '__main__':
arr = Array('i', [x for x in range(6)]) #定义Array类型变量,i表示数据类型为int
print(arr[:])
p = Process(target=worker, args=(arr,))
p.start()
p.join() #等待子进程执行完成
print(arr[:])
输出结果1
2[0, 1, 2, 3, 4, 5]
[0, -1, -2, -3, -4, -5]
Manager
共享内存的实现,除了使用Value
和Array
,还可以使用Manager
实现。
Manager()返回的manager对象通过一个服务进程,让其他进程通过代理的方式从挨揍python对象。manager对象支持list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from multiprocessing import Manager, Process
def f(d, l):
d[1] = '1'
d[2] = '2'
d[0.25] = None
l.reverse()
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
输出结果1
2{1: '1', 2: '2', 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Pool
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。
进程池提供阻塞和非阻塞两种方法Pool.apply_async
非阻塞,定义的进程池进程最大数可以同时执行Pool.apply
一个进程结束,释放回进程池,下一个进程才可以开始
- 使用
Pool.apply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import time
import multiprocessing
def fun(msg):
print("_start {0}".format(msg))
time.sleep(3)
print("_end {0}".format(msg))
if __name__ == '__main__':
print("start main")
pool = multiprocessing.Pool(processes=2)
for i in range(1, 3):
msg = "hello {0}".format(i)
pool.apply(fun, (msg,))
pool.close()#在调用join之前,要先调用close,否则会报错,close执行完不会有新的进程加入到pool
pool.join()#join 是等待所有的子进程结束
print("end main")
输出结果1
2
3
4
5
6start main
_start hello 1
_end hello 1
_start hello 2
_end hello 2
end main
输出结果为按顺序,一个进程结束再进行下一个
- 使用
Pool.apply_async
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import time
import multiprocessing
def fun(msg):
print("_start {0}".format(msg))
time.sleep(3)
print("_end {0}".format(msg))
if __name__ == '__main__':
print("start main")
pool = multiprocessing.Pool(processes=2)
for i in range(1, 3):
msg = "hello {0}".format(i)
pool.apply_async(fun, (msg,))
pool.close()
pool.join()
print("end main")
输出结果1
2
3
4
5
6start main
_start hello 1
_start hello 2
_end hello 1
_end hello 2
end main
此时两个进程可以同时执行。
更多关于multiprocessing
的内容可以点击multiprocessing — Process-based parallelism查看官方介绍。