python並發編程之Queue線程、進程、協程通信(五)


單線程、多線程之間、進程之間、協程之間很多時候需要協同完成工作,這個時候它們需要進行通訊。或者說為了解耦,普遍采用Queue,生產消費模式。

系列文章

同步deque和多線程Queue

程序有時需要在列表的端點進行操作,比list更加優化的數據結構有Queue和deque。

deque

deque一般用在定長隊列,多余的數據會被丟棄,這個隊列是線程非安全的。

from queue import Queue, deque

# 大於會截取后面的一段
q = deque(iterable=[1,2,3,4], maxlen=5)
# 參數iterable可以是任何可迭代對象,maxlen代表定長

# 添加與取出
q.append(3) # 從尾部添加
q.pop() # 從尾部彈出一個
q.appendleft(4) # 從首部添加
q.popleft() # 從首部彈出
q.clear() # 清空隊列
q.extend([1, 3, 3])  # 將原來的隊列從右側擴展
q.extendleft()  # 將原來的隊列從左側擴展
q.insert(2, 3)  # 在索引為2的位置插入3,如果隊列已達到最大,拋出異常

# 復制
q1 = q.copy() # 完全符合一份隊列
# 統計
n = q.count(3) # 統計某個值的數目
x = q.index(3)  # 查找某個值的位置
# 變換
q.reverse() # 將原來的q翻轉
q.remove(3) # 刪除隊列中的所有的3
q.rotate(2) # 向右旋轉兩步

Queue

Queue提供更加完整成熟的隊列操作,相對於deque來說偏重型,他是線程安全的隊列。

  • 方法和屬性分析
from queue import Queue, deque

q = Queue(maxsize=5) #maxsize<=0,隊列長度沒有限制,這個Queue是線程安全的,通過鎖機制保證
print(q.queue) # 一個deque隊列
print(q.mutex) # 隊列的線程鎖
print(q.not_empty) # 非空通知,用在多線程
print(q.not_full) # 非滿通知,用在多線程
print(q.all_tasks_done) # 完成的任務
print(q.maxsize)
print(q.unfinished_tasks) # 隊列未完成的任務數量,即隊列目前的數目

# 數據存取
q.put(3, block=True, timeout=3) # 向隊列左邊添加數據,block為True隊列滿了阻塞等待,block為false則直接拋出異常
q.get(block=True, timeout=3) # 隊列取出數據,超時拋出異常,block為false忽略timeout
# q.get_nowait() # 立即獲取,沒有拋出異常
q.put_nowait(4) # 立即插入,已滿拋出異常
# 判斷
q.full() # 判斷當前隊列是否已滿,滿了返回True
q.empty() # 判斷當前隊列是否為空,空返回True

# 統計
q.task_done() # 用來通知隊列任務完成
q.qsize() # 當前隊列的任務數量,不絕對可靠

q.join() # 阻塞直到所有的任務完成,即q.unfinished_tasks降為0
  • 實例
from threading import Thread
from queue import Queue, deque
import time

def get_from_queue(queue:Queue):
    while True:
        if not queue.empty():
            print(queue.get_nowait())
            queue.task_done() # 任務完成

def put_to_queue(queue:Queue):
    for i in range(100):
        if not queue.full():
            queue.put_nowait(i)
        else:
            time.sleep(0.1)

q = Queue(5)
th1 = Thread(target=get_from_queue, args=(q,))
th2 = Thread(target=put_to_queue, args=(q,))
th1.start()
th2.start()

進程間通訊

multiprocessing的Queue對象可以作為進程間通訊的第三者。

from multiprocessing import Queue, Process, Pool
import time

def get_from_queue(queue:Queue):
    while True:
        if not queue.empty():
            print(queue.get_nowait())

def put_to_queue(queue:Queue):
    for i in range(100):
        if not queue.full():
            queue.put_nowait(i)
        else:
            time.sleep(0.1)
if __name__ == '__main__':
    q = Queue(9) # 這個Queue可以在多個進程之間共享
    p1 = Process(target=get_from_queue, args=(q,))
    p2 = Process(target=put_to_queue, args=(q,))
    p1.start()
    p2.start()

multiprocessing.Queue對象

Queue對象的大部分方法和Queue.Queue的方法相同,用法也一樣,但有幾個特殊的方法:

q = Queue(9) # 這個Queue可以在多個進程之間共享
# q.close() # 關閉隊列,不再接收數據
# q.cancel_join_thread() # 取消阻塞等待
q.join_thread() # 線程阻塞等待

gevent協程的Queue

gevent.queue.Queue基於協程,Queue在多個協程間共享,Queue實現了迭代器協議,可以使用for循環遍歷。

from gevent.queue import Queue
import gevent
import time

def get_from_queue(queue:Queue, n):
    i = 0
    print('start---get--{}'.format(n))
    while True:
        print(str(queue.get()) + 'get' + str(n))
        i += 1
        if i == 100:
            break

def put_to_queue(queue:Queue, n):
    i = 0
    print('start---put--{}'.format(n))
    while True:
        queue.put(i)
        print(str(i) + 'put' + str(n))
        i += 1
        if i == 100:
            break
if __name__ == '__main__':
    q = Queue(9) # 這個Queue可以在多個進程之間共享
    job1 = [gevent.spawn(put_to_queue, q,i) for i in range(2)]
    job2 = [gevent.spawn(get_from_queue, q,i) for i in range(2)]
    job1.extend(job2)
    gevent.joinall(job1)

協程啟動后會按照添加到循環的順序開始執行,上例在隊列未滿之前一直執行put操作,直到隊列滿后阻塞就切換到put2協程,也會立即阻塞,然后切換到get1協程,獲取所有的值直到隊列為空后阻塞切換。

gevent.queue.Queue對象

其方法基本和Queue.Queue的方法相同,特殊方法如下:

q = Queue(9, items=[1,2,3, StopIteration]) # 實現迭代協議,最后一個必須是StopIteration
# q.copy() #復制一個隊列
x = q.next() # 喚醒獲取值
q.peek(block=True, timeout=None) # 獲取一個值但是不刪除它
q.peek_nowait() # 立即獲取,忽略timeout
q.put() # 會喚醒多個協程完成添加操作
q.get() # 會掛起多個協程

gevent.queue.JoinableQueue對象擴展了Queue的功能,添加了task_done和join方法。

q = JoinableQueue(9, items=[1,2,3, StopIteration]) # 這個Queue可以在多個進程之間共享
q.task_done() # 通知隊列一個任務完成
q.unfinished_tasks # 未完成的任務計數
q.join() # 阻塞等待任務完成,如果unfinished_tasks降為0,則解除
  • 實例
from gevent.queue import Queue, JoinableQueue
import gevent
import time

def get_from_queue(queue:JoinableQueue):
    while True:
        try:
            x = queue.get() # 阻塞時就會切換協程
            print(x)
        finally:
            queue.task_done()

if __name__ == '__main__':
    q = JoinableQueue(8)
    job1 = [gevent.spawn(get_from_queue, q) for i in range(2)]
    for i in range(100):
        q.put(i) # 當Put被阻塞時將切換協程,
    q.join() # 如果不等待的話,最后一次put后將直接退出

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com