Go 部落格
Go 並行模式:管道和取消
前言
Go 的並行原語簡化串流資料管道的建構,能有效運用 I/O 和多個 CPU。本文說明這類管道的範例,重點說明執行作業失敗時的微妙差異,並詳細說明處理失敗的技巧。
什麼是管道?
Go 中沒有管道的正式定義;它只是並行程式的其中一種。非正式來說,管道是一連串由通道連接的「階段」,其中每個階段都是一組執行相同函式的 goroutine。在每個階段,goroutine 會
- 透過「輸入」通道從「上游」接收值
- 對資料執行一些函式,通常產生新值
- 透過「輸出」通道將值傳送至「下游」
各階段都有任意數量的輸入和輸出通道,但第一和最後階段分別僅有輸出或輸入通道。第一階段有時稱為來源或產生器;最後階段稱為水槽或消費者。
我們將從一個簡單的範例管道開始說明概念和技術。稍後,我們將展示更實際的範例。
平方數字
考慮一個有三個階段的管道。
第一個階段 gen
是將整數清單轉換成一個通道的函式,該通道會發出清單中的整數。gen
函式會啟動一個 goroutine,在通道上傳送整數,並在所有值都傳送完畢後關閉通道。
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
第二個階段 sq
從一個通道中接收整數,並回傳一個通道來發出每個接收整數的平方。當輸入通道關閉且此階段已將所有值往下游傳送後,它就會關閉輸出通道。
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
main
函式設定管道並執行最後階段:它從第二個階段接收值,並列印每個值,直到通道關閉為止。
func main() { // Set up the pipeline. c := gen(2, 3) out := sq(c) // Consume the output. fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 }
由於 sq
的輸入和輸出通道型別相同,我們可以隨意組合多次。我們也可以將 main
重新寫成一個範圍迴圈,就像其他階段一樣。
func main() { // Set up the pipeline and consume the output. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } }
分岔輸出、分岔輸入
多個函式可以從同一個通道讀取,直到該通道關閉為止;這稱為分岔輸出。這提供了一個將工作分配到一組工作者間的方法,以將 CPU 用量和 I/O 平行化。
一個函式可以從多個輸入讀取,並持續執行,直到透過將輸入通道多工至一個單一通道來關閉所有通道,而這個單一通道會在所有輸入都關閉後關閉。這稱為分岔輸入。
我們可以變更我們的管道來執行 sq
的兩個執行個體,每個執行個體都從同一個輸入通道讀取。我們引入一個新函式 merge 來分岔輸入結果。
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the merged output from c1 and c2. for n := range merge(c1, c2) { fmt.Println(n) // 4 then 9, or 9 then 4 } }
merge
函式將通道清單轉換成單一通道,方法是針對每個輸入通道啟動一個 goroutine,將值複製到唯一的輸出通道。一旦所有 output
goroutine 都已啟動,merge
會啟動另一個 goroutine,在該通道上的所有傳送都完成後關閉輸出通道。
在已關閉的通道上傳送會引發 panic,因此在呼叫關閉之前確保所有傳送都已完成非常重要。sync.WaitGroup
型別提供了一種安排此同步的簡單方式。
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done. output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out }
停止縮短
我們的管道函式有一個模式。
- 當所有傳送作業都完成時,階段會關閉其輸出通道。
- 階段會持續從輸入通道接收值,直到這些通道都關閉為止。
此模式允許每個接收階段寫成 range
迴圈,並確保一旦所有值都已成功往下游傳送,所有 goroutine 都會結束執行。
但在實際的管道中,階段並不總是收到所有輸入值。有時這是預期的:接收器可能只需要一個子集的值來進行處理。通常,一個階段會及早退出,因為一個輸入值表示前面階段的錯誤。不論在什麼情況下,接收器都不會需要等待其餘的值,而我們希望前面的階段停止產生後面的階段不需要的值。
如果一個階段無法消耗所有輸入值,在我們的範例管道中,嘗試傳送這些值的goroutines將會無限期地封鎖
// Consume the first value from the output. out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // Since we didn't receive the second value from out, // one of the output goroutines is hung attempting to send it. }
這是資源外洩:goroutines會消耗記憶體和執行時間資源,而goroutine堆疊中的堆疊參考會防止資料被垃圾回收器回收。goroutines不會被垃圾回收;它們必須自行結束。
我們需要安排我們的管道的上游階段可以在下游階段無法收到所有輸入值的情況下結束。執行此項任務的方法之一是變更外送通道以具有緩衝區。緩衝區可以存放固定數量的值;如果緩衝區有空間,則傳送作業會立即完成
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
當在通道建立時已知要傳送的值數量,緩衝區可以簡化程式碼。例如,我們可以重新撰寫gen
將整數清單複製到一個有緩衝區的通道,並避免建立一個新的goroutine
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out }
回到我們管道中封鎖的goroutines,我們可以考量為merge
回傳的外送通道新增一個緩衝區
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // enough space for the unread inputs // ... the rest is unchanged ...
雖然這修復了此程式中的封鎖goroutine,但這是個不佳的程式碼。這裡的緩衝區大小選擇為1,仰賴知道merge
會收到的值數量和下游階段會消耗的值數量。這很脆弱:如果我們傳遞一個額外的值給gen
,或如果下游階段讀取少於這個數字的值,我們將會再次出現封鎖的goroutines。
相反地,我們需要提供一種方式讓下游階段表示給傳送者它們將會停止接受輸入。
明確的取消
當main
決定退出,且未從out
收到所有值,它必須告訴上游階段的goroutines放棄它們嘗試傳送的值。它透過在稱作done
的通道上傳送值來這麼做。由於可能有兩個封鎖的傳送者,所以它傳送兩個值
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the first value from output. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // Tell the remaining senders we're leaving. done <- struct{}{} done <- struct{}{} }
傳送goroutines用一個select
陳述式取代它們的傳送作業,該陳述式在out
上的傳送發生或它們從done
收到值時繼續進行。done
的值類型是空的結構,因為這個值並沒差:它是接收事件,表示out
上的傳送應被放棄。output
goroutines會在其輸入通道c
上持續迴圈,因此上游階段不會被封鎖。(稍後我們會討論如何讓此迴圈提前回傳。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed or it receives a value // from done, then output calls wg.Done. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
這種方法有個問題:每個下游接收器都需要知道潛在阻塞的上游發送器數量,並安排在早期回傳時通知這些發送器。持續追蹤這些數量既繁瑣又容易出錯。
我們需要一種方法,能同時告訴數量不明且無限多的 goroutine 停止將其數值傳送給下游。在 Go 中,我們可以透過關閉管道來做到這點,因為 在已關閉管道上執行接收動作可以立即進行,會產生元素類型的零值。
這表示 main
可以藉由關閉 done
管道解除所有發送器的封鎖。此關閉動作實際上是傳送給發送器的廣播訊號。我們對每個管道函數進行延伸,使其接受 done
為參數,並透過 defer
陳述式安排關閉,這樣所有從 main
回傳的路徑都會訊號管道階段要結束。
func main() { // Set up a done channel that's shared by the whole pipeline, // and close that channel when this pipeline exits, as a signal // for all the goroutines we started to exit. done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(done, in) c2 := sq(done, in) // Consume the first value from output. out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // done will be closed by the deferred call. }
我們每個管道階段現在都能在 done
關閉時立即回傳。merge
中的 output
函式可以在未清空輸入管道的情況下回傳,因為它知道上游發送器 sq
會在 done
關閉時停止嘗試傳送。output
會確保透過 defer
陳述式,在所有回傳路徑上都呼叫 wg.Done
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
類似地,一旦 done
關閉,sq
就可以回傳。sq
會確保在所有回傳路徑上都關閉其 out
管道,方法是透過 defer
陳述式
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out }
以下是建立管道的準則
- 當所有傳送作業都完成時,階段會關閉其輸出通道。
- 各階段會持續從輸入管道接收數值,直到這些管道關閉,或發送器取消封鎖。
管道會透過確保所有送出的數值有足夠的緩衝區,或在接收器可能捨棄管道時明確通知發送器,來取消發送器的封鎖。
消化樹狀結構
讓我們考慮更實際的管道。
MD5 是訊息摘要演算法,可作為檔案檢查和演算法使用。指令列工具程式 md5sum
會印出檔案清單的摘要值。
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的範例程式就像 md5sum
,但它會將單一目錄當成引數,並印出該目錄下每個一般檔案,以路徑名稱排序的摘要值。
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們程式的主要函式會呼叫輔助函式 MD5All
,它會回傳一個從路徑名稱對應到摘要值的對應,然後排序並印出結果
func main() { // Calculate the MD5 sum of all files under the specified directory, // then print the results sorted by path name. m, err := MD5All(os.Args[1]) if err != nil { fmt.Println(err) return } var paths []string for path := range m { paths = append(paths, path) } sort.Strings(paths) for _, path := range paths { fmt.Printf("%x %s\n", m[path], path) } }
MD5All
函式是我們討論的重點。在 serial.go 中,實作不使用任何並行處理,只是在巡覽樹狀結構時讀取並對每個檔案進行加總。
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil }
平行消化
在 parallel.go 中,我們將 MD5All
分割成一個兩階段的串流。第一階段,sumFiles
,走查目錄樹,在一個新的 goroutine 中對每個檔案進行摘要,並在一個類型為 result
的頻道上傳送結果
type result struct { path string sum [md5.Size]byte err error }
sumFiles
返回兩個頻道:一個頻道用於 results
,另一個頻道用於由 filepath.Walk
返回的錯誤。走查函式開始一個新的 goroutine 來處理每個常規檔案,然後檢查 done
。如果 done
已關閉,則走查將立即停止
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { // For each regular file, start a goroutine that sums the file and sends // the result on c. Send the result of the walk on errc. c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } wg.Add(1) go func() { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: } wg.Done() }() // Abort the walk if done is closed. select { case <-done: return errors.New("walk canceled") default: return nil } }) // Walk has returned, so all calls to wg.Add are done. Start a // goroutine to close c once all the sends are done. go func() { wg.Wait() close(c) }() // No select needed here, since errc is buffered. errc <- err }() return c, errc }
MD5All
從 c
收到摘要值。MD5All
在發生錯誤時提前返回,並透過一個 defer
關閉 done
func MD5All(root string) (map[string][md5.Size]byte, error) { // MD5All closes the done channel when it returns; it may do so before // receiving all the values from c and errc. done := make(chan struct{}) defer close(done) c, errc := sumFiles(done, root) m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil }
有界平行
在 parallel.go 中實作的 MD5All
,會為每個檔案開啟一個新的 goroutine。在一個有許多巨大檔案的目錄中,這可能會分配比機器上可用的記憶體還多的記憶體。
我們可以透過限制平行讀取的檔案數量,來限制這些分配。在 bounded.go 中,我們建立一個定數量的 goroutine 來讀取檔案,即可達成這個目的。我們的串流現在有三個階段:走查目錄樹、讀取和摘要檔案,以及收集摘要。
第一階段,walkFiles
,發出樹中的常規檔案路徑
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) errc := make(chan error, 1) go func() { // Close the paths channel after Walk returns. defer close(paths) // No select needed for this send, since errc is buffered. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() return paths, errc }
中間階段啟動一個定數量的 digester
goroutines,從 paths
接收檔名,並在頻道 c
上傳送 results
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
跟我們之前的範例不同,digester
不會關閉它的輸出頻道,因為有多個 goroutines 在一個共用頻道上傳送資料。取而代之的是,MD5All
中的程式碼安排在所有 digester
都完成時關閉頻道
// Start a fixed number of goroutines to read and digest files. c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }()
我們也可以讓每個 digester 建立並傳回自己的輸出頻道,但這樣一來,我們將需要額外的 goroutine 來將結果扇入。
最後一個階段從 c
接收所有 results
,然後檢查 errc
中的錯誤。此檢查無法在更早的時間點發生,因為在此點之前,walkFiles
可能會封鎖傳送下游的資料
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // Check whether the Walk failed. if err := <-errc; err != nil { return nil, err } return m, nil }
結論
這篇文章介紹了在 Go 中建構串流資料串流處理的技術。處理此類串流處理中的失敗很棘手,因為串流處理中的每個階段都可能封鎖試圖傳送下游的資料,而下游階段可能不再關心傳入的資料。我們展示了如何關閉一個頻道,來對串流處理啟動的所有 goroutine 廣播一個「完成」訊號,並定義了如何正確建構串流處理的準則。
深入閱讀
- Go 並發模式(影片)說明了 Go 的並發原語基礎,以及幾種應用方式。
- 進階 Go 並發模式(影片)涵蓋 Go 原語較複雜的用途,特別是
select
。 - Douglas McIlroy 的論文 Squinting at Power Series 說明了類似 Go 的並發如何優雅地支援複雜計算。
下一篇文章:Go 地鼠
上一篇文章:FOSDEM 2014 的 Go 演講
網誌索引