快速掌握Lua 5.3 —— Coroutines


Q:什么是”Coroutine”?

A:有些類似於多線程,但他們之間也有區別,
1、從概念上來看,多線程是同一時間所有的線程同時都在運行。而一組”Coroutines”在同一時間只有一個”Coroutine”在運行。
2、從應用場景來看,多線程一般起到分流的作用,每個線程專注做自己的事情,線程之間合作的關系較弱。而一組”Coroutines”之間合作的關系就比較強,他們都是在做同一件事情,他們分攤了這件事情中的工作。

Q:如何使用”Coroutines”?

A:

-- 創建"Coroutines"。
co = coroutine.create(function()
         print("hi")
     end)
print(type(co))    --> thread -- "Coroutine"是個"thread"。

-- "Coroutine"有3種狀態:掛起,運行,死亡。
-- 當"Coroutine"被創建之后默認是掛起狀態。
print(coroutine.status(co))    --> suspended
-- 運行"Coroutine"。
coroutine.resume(co)    --> hi
-- "Coroutine"在運行完成之后變為死亡狀態。
print(coroutine.status(co))    --> dead

目前看起來”Coroutine”與普通的函數調用沒有太大區別,但他真正強大的地方在於coroutine.yield(),它可以讓運行中的”Coroutine”掛起,

co = coroutine.create(function ()
         for i=1, 10 do
             print("co", i)
             coroutine.yield()    -- 掛起"Coroutine"。
         end
     end)
coroutine.resume(co)    --> co 1
coroutine.resume(co)    --> co 2
coroutine.resume(co)    --> co 3
...
co)    --> co 10
coroutine.resume(co)    -- prints nothing -- "for"循環結束。

以及coroutine.resume()coroutine.yield()之間可以方便的交換數據,

-- 可以通過"coroutine.resume()"向"coroutine"調用的函數傳遞參數。
co = coroutine.create(function(a, b)
         print("co", a, b)
     end)
coroutine.resume(co, 1, 2)    --> co 1 2

--[[ 可以通過"coroutine.yield()"向"coroutine.resume()"傳遞參數。 "coroutine.yield()"傳遞的參數由"coroutine.resume()"的返回值呈現。]]
co = coroutine.create(function(a, b)
         for i = 1, 3 do
             print(i, a, b)
             coroutine.yield(i, a + b, a - b)
         end
     end)
print(coroutine.resume(co, 20, 10))
print(coroutine.resume(co, 50, 10))
print(coroutine.resume(co, 70, 90))
--[[ 結果。 這里可以看到,第一次"coroutine.resume()"所傳遞的參數, 將作為之后每次"coroutine.resume()"所傳遞的參數, 無論之后的"coroutine.resume()"是否指定了新的參數。]]
1   20  10
true    1   30  10
2   20  10
true    2   30  10
3   20  10
true    3   30  10

--[[ 對稱的,可以通過"coroutine.resume()"向"coroutine.yield()"傳遞參數。 "coroutine.resume()"傳遞的參數由"coroutine.yield()"的返回值呈現。]]
co = coroutine.create (function(a, b)
         for i = 1, 3 do
             print(i, a, b)
             print("co", coroutine.yield())
         end
     end)
coroutine.resume(co, "first", "call", "discard")
coroutine.resume(co, "second", "call")
coroutine.resume(co, "third", "call")
coroutine.resume(co, "fourth", "call")
--[[ 結果。 這里可以看到, 1、從創建"coroutine"之后的第一次"coroutine.resume()" 傳遞的多余參數被丟棄了。 2、與上面的例子相同,第一次"coroutine.resume()"所傳遞的參數, 將作為之后每次"coroutine.resume()"所傳遞的參數, 無論之后的"coroutine.resume()"是否指定了新的參數。 3、第一次之后的"coroutine.resume()"所傳遞的參數 都會被視作傳遞給"coroutine.yield()"的參數。 4、當"coroutine"被掛起時,"coroutine.yield()"並不返回, 猜測可能是在"coroutine.yield()"內部暫停了。 之后當"coroutine"被恢復時,"coroutine.yield()"才返回, 猜測可能是從"coroutine.yield()"內部繼續運行到"coroutine.yield()"結束返回。 以上的現象與"coroutine"的運行流程相對應, 在創建"coroutine"之后的第一次"coroutine.resume()" 是從創建"coroutine"時指定的函數開始運行, 之后的每次"coroutine.resume()" 是從"coroutine.yield()"內部暫停的地方開始運行。]]
1   first   call
co  second  call
2   first   call
co  third   call
3   first   call
co  fourth  call

--[[ 當"coroutine"轉換為死亡狀態時, 創建"coroutine"時所指定的函數所返回的值也會傳遞給"coroutine.resume()", 由"coroutine.resume()"的返回值呈現。]]
co = coroutine.create(function ()
         for i = 1, 3 do
             coroutine.yield()
         end

         return 6, 7
     end)
print(coroutine.resume(co))   --> true
print(coroutine.resume(co))   --> true
print(coroutine.resume(co))   --> true
print(coroutine.resume(co))   --> true 6 7

Q:使用”Coroutines”的例子?

A:協同程序的一個典型例子是生產者和消費者問題。比如一個函數不斷產生值(從一個文件中讀取數據,此例中是從標准輸入中讀取),另一個函數不斷消耗值(將數據寫到另一個文件中,此例中是向標准輸出寫)。

function receive ()
    local status, value = coroutine.resume(producer)
    return value
end

function send (x)
    coroutine.yield(x)
end

function consumer ()
    while true do
        local x = receive()        -- receive from producer
        io.write(x, "\n")          -- consume new value
    end
end

producer = coroutine.create(
function ()
    while true do
        local x = io.read()     -- produce new value
        send(x)                 -- send to consumer
    end
end)

consumer()     -- 從消費者開始。
--[[ 消費者寫數據需要先接收數據,調用"receive()",
     "receive()"內部會恢復"coroutines"的運行,並等待數據,
     這樣主導權到了生產者手中,
     生產者讀取數據並發送數據,調用"send()""send()"內部掛起"coroutines",
     此時數據通過"coroutine.yield()"傳遞給了"coroutine.resume()""coroutine.resume()"返回數據給"receive()",寫數據。
     循環往復。]]

我們可以過濾器擴展這個例子,過濾器在生產者與消費者之間,可以對數據
進行某些轉換處理。過濾器在同一時間既是生產者,也是消費者。說的更具體一些,過濾器在同一時間,對於真正的生產者,它是消費者,對於真正的消費者,它是生產者。
我們修改上面的例子,加入打印行號的功能,

function receive (prod)
    local status, value = coroutine.resume(prod)
    return value
end

function send (x)
    coroutine.yield(x)
end

function producer ()
    return coroutine.create(function ()
        while true do
            local x = io.read()
            -- 掛起producer(即自己)所在的"coroutines",返回給filter數據。
            send(x)
        end
    end)
end

function filter (prod)
    return coroutine.create(function ()
        local line = 1
        while true do
            -- 恢復producer所在的"coroutines",以讓producer提供數據。
            local x = receive(prod)
            x = string.format("%5d %s", line, x)    -- 行號。
            -- 掛起filter(即自己)所在的"coroutines",返回給consumer數據。
            send(x)
            line = line + 1
        end
    end)
end

function consumer (prod)
    while true do
        -- 恢復filter所在的"coroutines",以讓filter提供數據。
        local x = receive(prod)
        io.write(x, "\n")
    end
end

--[[ "producer()"創建了一組"coroutines",由filter掌控;
     "filter()"創建了一組"coroutines",由consumer掌控。]]
consumer(filter(producer()))

上面這個例子可能很自然的讓你想到”UNIX”的管道。管道的方式下每一個任務在獨立的進程中運行,而”coroutines”方式下每個任務運行在獨立的”coroutine”中。進程之間的切換代價很高,而”coroutine”的切換的代價大致相當於函數之間的切換。

Q:如何使用”Coroutines”實現”Iterator”?

A:”Iterator”是一種典型的“生產者 - 消費者”模式,由”Iterator”產生值,由循環體消耗值。下面是一個打印數組內元素全排列的例子,我們先用遞歸來實現它,

-- 算法很簡單,就是依次將數組中每一個元素放到數組的最后,然后計算余下元素組成的數組的全排列。
function permgen (a, n)
    if n == 0 then
        printResult(a)
    else
        for i=1,n do
            -- put i-th element as the last one
            a[n], a[i] = a[i], a[n]

            -- generate all permutations of the other elements
            permgen(a, n - 1)

            -- restore i-th element
            a[n], a[i] = a[i], a[n]
        end
    end
end

function printResult (a)
    for i,v in ipairs(a) do
        io.write(v, " ")
    end
    io.write("\n")
end

permgen ({1,2,3,4}, 4)

接下來,將其轉換為”Coroutines”的實現方式就很簡單了,

function permgen (a, n)
    if n == 0 then
        -- 一旦得到排列結果,"coroutine"掛起,返回結果給"coroutine.resume()"。
        coroutine.yield(a)
    else
        for i=1,n do

            -- put i-th element as the last one
            a[n], a[i] = a[i], a[n]

            -- generate all permutations of the other elements
            permgen(a, n - 1)

            -- restore i-th element
            a[n], a[i] = a[i], a[n]

        end
    end
end

-- 內部創建"coroutine"以及返回"iterator function"。
function perm (a)
    local n = #a -- 獲得數組的大小。
    -- "coroutine"負責以遞歸的方式產生排列結果。
    local co = coroutine.create(function () permgen(a, n) end)
    -- "iterator function"負責恢復"coroutine"以從"coroutine.yield"得到排列結果。
    return function ()
        local code, res = coroutine.resume(co)
        return res
    end
end

function printResult (a)
    for i,v in ipairs(a) do
        io.write(v, " ")
    end
    io.write("\n")
end

for p in perm{"a", "b", "c"} do
    printResult(p)
end

perm()使用了Lua中一種常用的模式:將恢復”coroutine”的操作封裝在一個函數里。這種模式在Lua中經常被使用,所以Lua提供coroutine.wrap()來實現這種操作。與coroutine.create()相比,coroutine.create()返回”coroutine”本身,而coroutine.wrap()在內部調用coroutine.resume()后,等待coroutine.resume()返回並返回其結果。當coroutine.resume()成功恢復了”coroutine”時,coroutine.wrap()返回coroutine.resume()除了第一個返回值(即”errcode”,true or false)外的其他返回值。而當coroutine.resume()恢復”coroutine”失敗時,coroutine.wrap()直接報錯。
所以,我們可以將原先perm()的實現更改為使用coroutine.wrap()

function perm (a)
  local n = #a
  return coroutine.wrap(function () permgen(a, n) end)
end

Q:如何使用”coroutines”實現搶占式線程?

A:”coroutines”是非搶占式的協作線程,這意味着在明確的指明要他停止(通過調用coroutine.yield())之前,他不能被其他線程打斷,這種機制在某些需要高實時性的場合是不能被接收的。
搶占式線程的“搶占”其原理就在於CPU為每個線程分配一個時間片,當時間片耗盡而線程的工作還沒有完成時,CPU就強制將該線程暫停同時讓另一個線程開始工作。
使用”coroutines”也可以實現搶占式線程,其原理就在於由我們自己為每一對”coroutine”分配時間片。以下的例子簡單的實現了搶占式線程,每個線程的工作都是計算某個整數區間內所有整數之和,

threads = {}    -- 調度表。所有工作中的線程都會存入此表。
time = os.time()    -- 每個線程開始工作時的時間。當線程被掛起時會更新這個時間。
limit_time = 1    -- 時間片。每個線程給1s的工作時間。

-- 計算某個整數區間內所有整數之和。
function cal(from, to)
    local sum = 0;
    for i = from, to do
        sum = sum + i
        -- 時間片耗盡,而工作還沒有完成。
        if (os.time() - time) >= limit_time then
            -- 打印計算進度。
            print(string.format("Worker %d calculating, %f%%.", worker, (i / to * 100)))
            time = os.time()    -- 當進程被掛起時更新時間,下一個進程將以此作為開始工作時的時間。
            coroutine.yield()    -- 休息。
        end
    end
    -- 工作完成,打印計算結果。
    print(string.format("Worker %d finished, %d.", worker, sum))
end

-- 分配任務。
function job (from, to)
    -- 創建"coroutine"。
    local co = coroutine.create(function ()
        cal(from, to)
    end)
    table.insert(threads, co)    -- 將線程加入調度表。
end

-- 4個線程,分別計算不同整數區間內所有整數之和。
job(1, 100000000)
job(10, 50000000)
job(5000, 6000000)
job(10000, 70000000)

-- 分發器。調度所有線程的運行。
while true do
    local n = #threads
    if n == 0 then break end   -- 沒有線程需要工作了。
    for i = 1, n do
        worker = i    -- 表示哪個線程在工作。
        local status = coroutine.resume(threads[i])    -- 恢復"coroutine"工作。
        if not status then    -- 線程是否完成了他的工作?"coroutine"完成任務時,status是"false"。
            table.remove(threads, i)    -- 將線程從調度表中刪除。
            break
        end
    end
end

Worker 1 calculating, 0.482016%.
Worker 2 calculating, 10.191070%.
Worker 3 calculating, 85.029767%.
Worker 4 calculating, 7.273926%.
Worker 1 calculating, 5.534152%.
Worker 2 calculating, 20.370044%.
Worker 3 finished, 17999990502500. <– 3號線程完成工作。
Worker 4 calculating, 13.201869%.
Worker 1 calculating, 10.590240%.
Worker 2 calculating, 30.563072%.
Worker 1 calculating, 15.656026%.
Worker 2 calculating, 40.731870%.
Worker 3 calculating, 20.430621%. <– 原先的4號線程變為3號線程。
Worker 1 calculating, 20.689852%.
Worker 2 calculating, 50.912444%.
Worker 3 calculating, 27.682421%.
Worker 1 calculating, 25.750115%.
Worker 2 calculating, 61.074508%.

實現的思路也很清晰。一個調度器擁有一張調度表,其中存儲了需要工作的線程。調度器為每個線程分配一個時間片,當發現線程的時間片耗盡,則掛起線程同時讓調度表中的下一個開始工作。當發現線程的工作完成時,則從調度表中移除該線程。總的來說,通過調度器與時間片,使用”coroutines”實現了簡單的搶占式線程。

附加:

1、coroutine.resume()會返回一個”bool”值,表示是否成功的恢復了掛起的”Coroutine”,

-- 處於掛起狀態的"Coroutine"可以恢復。
print(coroutine.status(xxx))    --> suspended
print(coroutine.resume(xxx))    --> true
-- 已處於死亡狀態的"Coroutine"無法恢復。
print(coroutine.status(xxx))    --> dead
print(coroutine.resume(xxx))    --> false cannot resume dead coroutine

2、在生產者和消費者問題的第一個例子中,
開始時調用消費者,當消費者需要值時他喚起生產者生產值,生產者生產值后停止直到消費者再次請求。我們稱這種設計為消費者驅動的設計。
3、”coroutines”是一種協作的多線程。每一個”coroutine”相當於一個線程。”yield-resume”的組合可以在線程之間互相轉換。然而,區別於真正的多線程,”coroutines”是非搶占的。當一個”coroutine”在運行的時候,他不能被其他的線程所打斷,除非明確的指明要他停止(通過調用coroutine.yield())。編寫非搶占式的多線程也比編寫搶占式的多線程容易的多,因為你不用擔心線程之間的同步問題所造成的bugs,你只需要確定當”coroutine”被掛起時他不是處在臨界區。


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
  © 2014-2022 ITdaan.com 联系我们: