Python queue队列

作用:

Queue

Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递

   解耦:使程序直接实现松耦合,修改一个函数,不会有串联关系。

基本FIFO队列

class Queue.Queue(maxsize=0)

FIFO即First in First
Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

举个栗子:

1 import Queue
2 
3 q = Queue.Queue()
4 
5 for i in range(5):
6     q.put(i)
7 
8 while not q.empty():
9     print q.get()

 

输出:

0
1
2
3
4

 

   提高处理效率:FIFO = 现进先出,LIFO = 后入先出。

LIFO队列

class Queue.LifoQueue(maxsize=0)

LIFO即Last in First
Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上

再举个栗子:

1 import Queue
2 
3 q = Queue.LifoQueue()
4 
5 for i in range(5):
6     q.put(i)
7 
8 while not q.empty():
9     print q.get()

 

输出:

4
3
2
1
0

 

可以看到仅仅是将Queue.Quenu类替换为Queue.LifiQueue类

 

优先级队列

class Queue.PriorityQueue(maxsize=0)

构造一个优先队列。maxsize用法同上。

import Queue
import threading

class Job(object):
    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print 'Job:',description
        return
    def __cmp__(self, other):
        return cmp(self.priority, other.priority)

q = Queue.PriorityQueue()

q.put(Job(3, 'level 3 job'))
q.put(Job(10, 'level 10 job'))
q.put(Job(1, 'level 1 job'))

def process_job(q):
    while True:
        next_job = q.get()
        print 'for:', next_job.description
        q.task_done()

workers = [threading.Thread(target=process_job, args=(q,)),
        threading.Thread(target=process_job, args=(q,))
        ]

for w in workers:
    w.setDaemon(True)
    w.start()

q.join()
结果
Job: level 3 job
Job: level 10 job
Job: level 1 job
for: level 1 job
for: level 3 job
for: job: level 10 job

 

队列:

一些常用方法

  队列可以并发的派多个线程,对排列的线程处理,并切每个需要处理线程只需要将请求的数据放入队列容器的内存中,线程不需要等待,当排列完毕处理完数据后,线程在准时来取数据即可。请求数据的线程只与这个队列容器存在关系,处理数据的线程down掉不会影响到请求数据的线程,队列会派给其他线程处理这分数据,它实现了解耦,提高效率。队列内会有一个有顺序的容器,列表与这个容器是有区别的,列表中数据虽然是排列的,但数据被取走后还会保留,而队列中这个容器的数据被取后将不会保留。当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。

task_done()

意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

 

join()

阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

 

put(item[, block[, timeout]])

将item放入队列中。

  1. 如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
  2. 如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
  3. 如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常

其非阻塞版本为put_nowait等同于put(item, False)

参数介绍:

get([block[, timeout]])

从队列中移除并返回一个数据。block跟timeout参数同put方法

其非阻塞方法为`get_nowait()`相当与get(False)

# 先入先出 maxsize 可设置大小,设置block=False抛异常
class queue.Queue(maxsize=0)  

 # 后进先出 
class queue.LifoQueue(maxsize=0)

# 存储数据时可设置优先级的队列
# 优先级设置数越小等级越高
class queue.PriorityQueue(maxsize=0) 

# 放入数据
Queue.put(item, block=True, timeout=None)

# 取出数据 #没有数据将会等待
Queue.get(block=True, timeout=None)

# 如果1秒后没取到数据就退出
Queue.get(timeout = 1)


# 取数据,如果没数据抛queue.Empty异常
Queue.get_nowait()

# 查看队列大小
Queue.qsize()

# 返回True,如果空
Queue.empty() #return True if empty  

# 设置队列大小
Queue.full() 

# 后续调用告诉队列,任务的处理是完整的。
Queue.task_done()

empty()

如果队列为空,返回True,反之返回False

 

生产者消费者模型:

import threading,time
import queue

# 最多存入10个
q = queue.Queue(maxsize=10)

def producer(name):
    count = 1

    while True:

           # 生产一块骨头
            q.put("骨头 %s" % count )
            print("生产了骨头",count)
            count +=1
            time.sleep(0.3)

def consumer(name):
    while True:
        print("%s 取到[%s] 并且吃了它" %(name, q.get()))
        time.sleep(1)

       # 告知这个任务执行完了
        q.task_done() 

# 生成线程
p = threading.Thread(target=producer,args=("德国骨科",))
c = threading.Thread(target=consumer,args=("陈狗二",))
d = threading.Thread(target=consumer,args=("吕特黑",))

# 执行线程
p.start()
c.start()
d.start()

发表评论

电子邮件地址不会被公开。 必填项已用*标注