python消息队列

消息队列简介

消息队列”是在消息的传输过程中保存消息的容器。
消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。

消息队列在multiprocessing的实现

操作系统提供了很多机制来实现进程间的通信 ,multiprocessing模块就提供了Queue和Pipe两种方法来实现。

Queue实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process, Queue
import os, time, random

def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入
pw.start()
# 启动子进程pr,读取
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
r.terminate()

输出结果

1
2
3
4
5
6
7
8
Process to write: 32532
Process to read: 28588
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Pipe实现

Pipe方法返回(conn1, conn2)代表一个管道的两个端。
Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接受结束以后,关闭管道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import multiprocessing
import time

def proc1(pipe):
for i in range(5):
print("send: %s" %(i))
pipe.send(i)
# print(dir(pipe))
time.sleep(1)

def proc2(pipe):
n = 5
while n:
print("proc2 rev:", pipe.recv())
time.sleep(1)
n -= 1

if __name__ == "__main__":
pipe = multiprocessing.Pipe(duplex=False)
p1 = multiprocessing.Process(target=proc1, args=(pipe[1],))
p2 = multiprocessing.Process(target=proc2, args=(pipe[0],))
p1.start()
p2.start()
p1.join()
p2.join()
pipe[0].close()
pipe[1].close()

输出结果

1
2
3
4
5
6
7
8
9
10
send: 0
proc2 rev: 0
send: 1
proc2 rev: 1
send: 2
proc2 rev: 2
send: 3
proc2 rev: 3
send: 4
proc2 rev: 4

queue模块实现消息队列

在python中还可以直接使用queue模块专门实现消息队列。
queue对象实现一个fifo队列(其他的还有lifo、priority队列,这里不再介绍)。queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数

  • queue.qsize()
    返回消息队列的当前空间。返回的值不一定可靠。
  • queue.empty()
    判断消息队列是否为空,返回True或False。同样不可靠。
  • queue.full()
    类似上边,判断消息队列是否满。
  • queue.put(item, block=True, timeout=None)
    往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。
  • queue.put_nowait(item)
    相当于put(item, False)。
  • queue.get(block=True, timeout=None)
    获取一个消息,其他同put。
    以下两个函数用来判断消息对应的任务是否完成。
  • queue.task_done()
    接受消息的线程通过调用这个函数来说明消息对应的任务已完成。
  • queue.join()
    实际上意味着等到队列为空,再执行别的操作。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    import threading, time
    from queue import Queue

    # Producer thread
    class Producer(threading.Thread):
    def __init__(self, t_name, queue):
    threading.Thread.__init__(self, name=t_name)
    self.data = queue

    def run(self):
    for i in range(5): # 随机产生10个数字 ,可以修改为任意大小
    print("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i))
    self.data.put(i) # 将数据依次存入队列
    print("%s: %s finished!" % (time.ctime(), self.getName()))


    # Consumer thread
    class Consumer_even(threading.Thread):
    def __init__(self, t_name, queue):
    threading.Thread.__init__(self, name=t_name)
    self.data = queue

    def run(self):
    while 1:
    try:
    val_even = self.data.get(1, 5) # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
    if val_even % 2 == 0:
    print("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_even))
    time.sleep(2)
    else:
    self.data.put(val_even)
    time.sleep(2)
    except: # 等待输入,超过5秒 就报异常
    print("%s: %s finished!" % (time.ctime(), self.getName()))
    break

    class Consumer_odd(threading.Thread):
    def __init__(self, t_name, queue):
    threading.Thread.__init__(self, name=t_name)
    self.data = queue

    def run(self):
    while 1:
    try:
    val_odd = self.data.get(1, 5)
    if val_odd % 2 != 0:
    print("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd))
    time.sleep(2)
    else:
    self.data.put(val_odd)
    time.sleep(2)
    except:
    print("%s: %s finished!" % (time.ctime(), self.getName()))
    break

    # Main thread
    def main():
    queue = Queue()
    producer = Producer('Pro.', queue)
    consumer_even = Consumer_even('Con_even.', queue)
    consumer_odd = Consumer_odd('Con_odd.', queue)
    producer.start()
    consumer_even.start()
    consumer_odd.start()
    producer.join()
    consumer_even.join()
    consumer_odd.join()
    print('All threads terminate!')

    if __name__ == '__main__':
    main()

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Fri May 25 22:10:01 2018: Pro. is producing 0 to the queue!
Fri May 25 22:10:01 2018: Pro. is producing 1 to the queue!
Fri May 25 22:10:01 2018: Pro. is producing 2 to the queue!
Fri May 25 22:10:01 2018: Pro. is producing 3 to the queue!
Fri May 25 22:10:01 2018: Pro. is producing 4 to the queue!
Fri May 25 22:10:01 2018: Pro. finished!
Fri May 25 22:10:01 2018: Con_even. is consuming. 0 in the queue is consumed!
Fri May 25 22:10:01 2018: Con_odd. is consuming. 1 in the queue is consumed!
Fri May 25 22:10:05 2018: Con_even. is consuming. 4 in the queue is consumed!
Fri May 25 22:10:13 2018: Con_odd. is consuming. 3 in the queue is consumed!
Fri May 25 22:10:13 2018: Con_even. is consuming. 2 in the queue is consumed!
Fri May 25 22:10:20 2018: Con_even. finished!
Fri May 25 22:10:20 2018: Con_odd. finished!
All threads terminate!

Recommended Posts