本文由 Google翻译-原文链接查看末尾
介绍
当我第一次开始使用 Go 的通道时,我错误地将通道视为一种数据结构。我将通道视为提供 goroutine 之间自动同步访问的队列。这种结构理解导致我写了很多糟糕而复杂的并发代码。
随着时间的推移,我了解到最好忘记渠道的结构,而专注于它们的行为方式。所以现在说到渠道,我想到一件事:信令。通道允许一个 goroutine 向另一个 goroutine 发送关于特定事件的信号。信号是您应该对频道做的一切的核心。将通道视为一种信号机制将使您能够编写具有明确定义和更精确行为的更好的代码。
要了解信号的工作原理,我们必须了解它的三个属性:
- 交货保证
- 状态
- 有或没有数据
这三个属性共同创造了一种围绕信号的设计理念。在讨论这些属性之后,我将提供一些代码示例来演示应用这些属性的信令。
交货保证
交付保证基于一个问题:“我是否需要保证已收到特定 goroutine 发送的信号?”
换句话说,给出清单 1 中的这个例子:
清单1
01 go func() {
02 p := <-ch // Receive
03 }()
04
05 ch <- "paper" // Send
发送 goroutine 是否需要保证paper第 05 行通过通道发送的消息在继续之前被第 02 行的 goroutine 接收?
根据对这个问题的回答,您将知道要使用两种类型的通道中的哪一种:Unbuffered或Buffered。每个通道围绕交付保证提供不同的行为。
图 1:交货保证
保证很重要,如果你不这么认为,我有很多东西想卖给你。当然,我是想开个玩笑,但是生活没有保障你不紧张吗?在编写并发软件时,深入了解是否需要保证是至关重要的。随着我们继续,您将学习如何做出决定。
状态
通道的行为直接受其当前状态的影响。通道的状态可以是nil、open或closed。
下面的清单 2 显示了如何将通道声明或放置到这三种状态中的每一种中。
清单 2
// ** nil channel
// A channel is in a nil state when it is declared to its zero value
var ch chan string
// A channel can be placed in a nil state by explicitly setting it to nil.
ch = nil
// ** open channel
// A channel is in a open state when it’s made using the built-in function make.
ch := make(chan string)
// ** closed channel
// A channel is in a closed state when it’s closed using the built-in function close.
close(ch)
状态决定了发送和接收操作的行为方式。
信号通过通道发送和接收。不要说读/写,因为通道不执行 I/O。
图 2:状态
当通道处于nil状态时,通道上的任何发送或接收尝试都将被阻止。当通道处于打开状态时,可以发送和接收信号。当通道处于关闭状态时,将无法再发送信号,但仍然可以接收信号。
这些状态将为您遇到的不同情况提供所需的不同行为。将State与Guarantee Of Delivery结合使用时,您可以开始分析由于您的设计选择而产生的成本/收益。在许多情况下,您还可以通过阅读代码快速发现错误,因为您了解通道的行为方式。
有和没有数据
需要考虑的最后一个信令属性是您是否需要使用或不使用数据来发送信号。
您可以通过在通道上执行发送来发送数据信号。
清单 3
01 ch <- "paper"
当您用数据发出信号时,通常是因为:
- 一个 goroutine 被要求开始一项新任务。
- 一个 goroutine 报告一个结果。
您可以通过关闭通道在没有数据的情况下发出信号。
清单 4
01 close(ch)
当您在没有数据的情况下发出信号时,通常是因为:
- 一个 goroutine 被告知停止他们正在做的事情。
- 一个 goroutine 报告他们已经完成但没有结果。
- 一个 goroutine 报告它已完成处理并关闭。
这些规则也有例外,但这些是主要用例,也是我们将在本文中重点关注的用例。我将这些规则的例外情况视为初始代码异味。
在没有数据的情况下发送信号的一个好处是单个 goroutine 可以同时向多个 goroutine 发送信号。数据信号总是在 goroutine 之间进行 1 对 1 的交换。
用数据发信号
当您准备用数据发送信号时,您可以根据需要的保证类型选择三个通道配置选项。
图 3:用数据发送信号
三个通道选项是Unbuffered、Buffered >1或Buffered =1。
保证
- 一个无缓冲通道为您提供了保证,一个信号发送已收到。
- 因为信号的接收发生在信号的发送完成之前。
无保证
- 甲缓冲大小的信道> 1给你没有保证使信号发送已经被接收。
- 因为信号的发送发生在信号的接收完成之前。
延迟保证
- 甲缓冲大小的信道= 1给你一个延迟保证。它可以保证之前发送的信号已经被接收到。
- 由于接收了的第一信号,这之前的发送中的第二个信号完成。
缓冲区的大小绝不能是随机数,它必须始终针对某些明确定义的约束进行计算。计算中没有无限,无论是时间还是空间,一切都必须有一些明确定义的约束。
没有数据的信令
没有数据的信令主要保留用于取消。它允许一个 goroutine 向另一个 goroutine 发出信号以取消他们正在做的事情并继续前进。取消可以使用 Unbuffered 和 Buffered 通道来实现,但是在没有数据发送时使用 Buffered 通道是一种代码味道。
图 4:没有数据的信令
内置函数close用于在没有数据的情况下发出信号。如上面状态部分所述,您仍然可以在关闭的通道上接收信号。事实上,关闭通道上的任何接收都不会阻塞并且接收操作总是返回。
在大多数情况下,您希望使用标准库context包来实现没有数据的信令。该context包使用下面的无缓冲通道进行信令和内置函数close以在没有数据的情况下发出信号。
如果您选择使用自己的频道进行取消,而不是上下文包,则您的频道应该是 类型chan struct{}。这是指示仅用于信令的信道的零空间惯用方式。
场景
有了这些属性,进一步了解它们在实践中如何工作的最佳方法是运行一系列代码场景。当我阅读和编写基于通道的代码时,我喜欢将 goroutines 视为人。这种可视化确实有帮助,我将在下面将其用作辅助。
带数据的信号 – 保证 – 无缓冲通道
当您需要知道已接收到正在发送的信号时,会出现两种情况。它们是Wait For Task和Wait For Result。
场景 1 – 等待任务
考虑成为一名经理并雇用一名新员工。在这种情况下,您希望新员工执行一项任务,但他们需要等到您准备就绪。这是因为你需要在他们开始之前给他们一张纸。
清单 5
https://play.golang.org/p/BnVEHRCcdh
01 func waitForTask() {
02 ch := make(chan string)
03
04 go func() {
05 p := <-ch
06
07 // Employee performs work here.
08
09 // Employee is done and free to go.
10 }()
11
12 time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
13
14 ch <- "paper"
15 }
在清单 5 的第 02 行,创建了一个 Unbuffered 通道,其属性是string数据将与信号一起发送。然后在第 04 行,雇佣了一名员工并被告知在工作之前等待您在第 05 行的信号。第 05 行是通道接收,导致员工在等待您将发送的纸张时阻塞。一旦员工收到文件,员工就会执行工作,然后完成并可以自由离开。
您作为经理与您的新员工同时工作。因此,在第 04 行雇用员工后,您会发现自己(在第 12 行)正在做您需要做的事情来解除阻止并向员工发出信号。请注意,不知道准备这张需要发送的纸张需要多长时间。
最终,您已准备好向员工发出信号。在第 14 行,您使用数据执行信号,数据就是那张纸。由于正在使用 Unbuffered 通道,您可以保证一旦发送操作完成,员工已收到文件。接收发生在发送之前。
从技术上讲,您所知道的是,在您的频道发送操作完成时,员工已经拿到了论文。在两次通道操作之后,调度器可以选择执行它想要的任何语句。由您或员工执行的下一行代码是不确定的。这意味着使用打印语句可以在事物的顺序上欺骗您。
场景 2 – 等待结果
在下一个场景中,情况正好相反。这次你希望你的新员工在被录用后立即执行一项任务,你需要等待他们的工作结果。您需要等待,因为您需要他们的论文才能继续。
清单 6
https://play.golang.org/p/VFAWHxIQTP
01 func waitForResult() {
02 ch := make(chan string)
03
04 go func() {
05 time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
06
07 ch <- "paper"
08
09 // Employee is done and free to go.
10 }()
11
12 p := <-ch
13 }
在清单 6 的第 02 行,创建了一个 Unbuffered 通道,其属性是string数据将与信号一起发送。然后在第 04 行,雇用了一名员工并立即投入工作。在第 04 行雇用员工后,您会发现自己在第 12 行等待纸质报告。
员工在第 05 行完成工作后,他们会在第 07 行通过执行带有数据的通道发送将结果发送给您。由于这是一个无缓冲通道,接收发生在发送之前,并且员工可以保证您已经收到结果。一旦员工获得此保证,他们就完成并可以自由离开。在这种情况下,您不知道员工完成任务需要多长时间。
成本效益
无缓冲通道保证接收到正在发送的信号。这很棒,但没有什么是免费的。这种保证的代价是未知的延迟。在“等待任务”场景中,员工不知道您发送该文件需要多长时间。在等待结果场景中,您不知道员工需要多长时间才能将结果发送给您。
在这两种情况下,这种未知的延迟都是我们必须忍受的,因为需要保证。没有这种有保证的行为,逻辑就不起作用。
带数据的信号 – 无保证 – 缓冲通道 >1
当您不需要知道已接收到正在发送的信号时,这两种情况就会发挥作用:Fan Out和Drop。
缓冲通道有一个明确定义的空间,可用于存储正在发送的数据。那么你如何决定你需要多少空间呢?回答这些问题:
- 我是否有明确定义的工作量要完成?
- 有多少工作?
- 如果我的员工跟不上,我可以放弃任何新工作吗?
- 有多少出色的工作让我有能力?
- 如果我的程序意外终止,我愿意接受多大程度的风险?
- 缓冲区中等待的任何内容都将丢失。
如果这些问题对您正在建模的行为没有意义,那么使用大于 1 的缓冲通道可能是错误的代码味道。
场景 1 – 扇出
扇出模式允许您将明确定义的员工数量投入同时工作的问题。由于每项任务都有一名员工,因此您确切知道将收到多少报告。您可以确保箱子中有足够的空间来接收所有这些报告。这有利于您的员工无需等待您提交报告。但是,如果他们同时或几乎同时到达箱子,他们确实需要轮流将报告放入您的箱子。
想象一下,您再次成为经理,但这次您雇用了一个员工团队。您有一项希望每个员工执行的单独任务。当每个员工完成他们的任务时,他们需要向您提供一份纸质报告,该报告必须放在您办公桌上的盒子中。
清单 7
https://play.golang.org/p/8HIt2sabs_
01 func fanOut() {
02 emps := 20
03 ch := make(chan string, emps)
04
05 for e := 0; e < emps; e++ {
06 go func() {
07 time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
08 ch <- "paper"
09 }()
10 }
11
12 for emps > 0 {
13 p := <-ch
14 fmt.Println(p)
15 emps--
16 }
17 }
在清单 7 的第 03 行,创建了一个 Buffered 通道,其属性是string数据将与信号一起发送。由于emps在第 02 行声明的变量,这次通道创建了 20 个缓冲区。
在第 05 行到第 10 行之间,雇用了 20 名员工,他们立即开始工作。您不知道每个员工在第 07 行需要多长时间。然后在第 08 行,员工发送纸质报告,但这次发送不会阻止等待接收。由于每个员工的盒子里都有空间,通道上的发送只会与可能希望同时或几乎同时发送报告的其他员工竞争。
第 12 行到第 16 行之间的代码就是你的全部。您可以在此处等待所有 20 名员工完成工作并发送报告。在第 12 行,您处于循环中,而在第 13 行,您被阻塞在一个频道中,等待您的报告。收到报告后,该报告将打印在第 14 行,并且本地计数器变量递减以指示员工已完成工作。
场景 2 – 丢弃
丢弃模式允许您在员工满负荷时扔掉工作。这样做的好处是可以继续接受客户的工作,并且永远不会在接受该工作时施加压力或延迟。这里的关键是知道你什么时候真正有能力,这样你就不会对你将尝试完成的工作量投入过多或过少。通常集成测试或指标是帮助您识别此数字所需要的。
想象一下,您再次成为经理,您雇用了一名员工来完成工作。您有一项希望员工执行的个人任务。当员工完成他们的任务时,你并不关心知道他们已经完成了。重要的是您是否可以将新作品放入盒子中。如果您无法执行发送,那么您就知道您的盒子已满,员工已满员。在这一点上,新的工作需要被丢弃,这样事情才能继续前进。
清单 8
https://play.golang.org/p/PhFUN5itiv
01 func selectDrop() {
02 const cap = 5
03 ch := make(chan string, cap)
04
05 go func() {
06 for p := range ch {
07 fmt.Println("employee : received :", p)
08 }
09 }()
10
11 const work = 20
12 for w := 0; w < work; w++ {
13 select {
14 case ch <- "paper":
15 fmt.Println("manager : send ack")
16 default:
17 fmt.Println("manager : drop")
18 }
19 }
20
21 close(ch)
22 }
在清单 8 的第 03 行,创建了一个缓冲通道,其属性是string数据将与信号一起发送。由于cap第 02 行声明的常量,这次创建的通道有 5 个缓冲区。
在第 05 行到第 9 行之间,雇佣了一名员工来处理工作。Afor range用于通道接收。每次收到一张纸时,都会在第 07 行进行处理。
在第 11 行到第 19 行之间,您尝试向您的员工发送 20 张纸。这次select在第case14 行的第一个语句中使用了一个语句来执行发送。由于在default第select16 行中使用了该子句,如果发送将由于缓冲区中没有更多空间而阻塞,则发送将被放弃通过执行第 17 行。
最后在第 21 行,close针对通道调用内置函数。这将在没有数据的情况下向员工发出信号,他们已完成并在完成分配的工作后自由离开。
成本效益
大于 1 的缓冲通道不能保证接收到正在发送的信号。摆脱这种保证有一个好处,即两个 goroutine 之间的通信延迟减少或没有延迟。在“扇出”场景中,每个将要发送报告的员工都有一个缓冲区空间。在Drop场景中,缓冲区是针对容量进行测量的,如果达到容量,工作就会被删除,以便事情可以继续进行。
在这两种选择中,我们都不得不接受这种缺乏保证的情况,因为减少延迟更为重要。零到最小延迟的要求不会对系统的整体逻辑造成问题。
带数据的信号 – 延迟保证 – 缓冲通道 1
当需要在发送新信号之前知道之前发送的信号是否已收到时,Wait For Tasks场景就派上用场了。
场景 1 – 等待任务
在这种情况下,您有一名新员工,但他们要做的不仅仅是一项任务。您将一个接一个地为他们提供许多任务。但是,他们必须先完成每个单独的任务,然后才能开始新的任务。由于他们一次只能处理一项任务,因此在工作交接之间可能存在延迟问题。如果可以在不失去员工正在处理下一个任务的保证的情况下减少延迟,那可能会有所帮助。
这就是缓冲通道为 1 的好处所在。如果您和员工之间的一切都以预期的速度运行,那么你们都不需要等待对方。每次发送一张纸时,缓冲区都是空的。每次您的员工完成更多工作时,缓冲区已满。这是工作流程的完美对称。
最好的部分是这个。如果您在任何时候尝试发送一张纸,但由于缓冲区已满而无法发送,则您知道您的员工遇到了问题,然后您就停止了。这就是延迟保证的用武之地。当缓冲区为空并且您执行发送时,您可以保证您的员工已经完成了您发送的最后一项工作。如果您执行发送而您不能执行,则您可以保证他们没有执行。
清单 9
https://play.golang.org/p/4pcuKCcAK3
01 func waitForTasks() {
02 ch := make(chan string, 1)
03
04 go func() {
05 for p := range ch {
06 fmt.Println("employee : working :", p)
07 }
08 }()
09
10 const work = 10
11 for w := 0; w < work; w++ {
12 ch <- "paper"
13 }
14
15 close(ch)
16 }
在清单 9 的第 02 行,创建了一个大小为 1 的缓冲通道,其属性是string数据将与信号一起发送。在第 04 行到第 8 行之间,雇佣了一名员工来处理工作。Afor range用于通道接收。每次收到一张纸时,都会在第 06 行进行处理。
在第 10 行到第 13 行之间,您开始将您的任务发送给员工。如果您的员工跑得和您一样快,那么你们两个之间的延迟就会减少。但是,每次发送成功后,您就可以保证您提交的最后一项工作正在处理中。
最后在第 15 行,close针对通道调用内置函数。这将在没有数据的情况下向员工发出信号,他们已经完成并可以自由离开。但是,您提交的最后一份工作将在for range终止之前收到(刷新)。
没有数据的信号 – 上下文
在最后一个场景中,您将看到如何使用包中的Context值取消正在运行的 goroutine context。这一切都是通过利用关闭的无缓冲通道来执行没有数据的信号来实现的。
您是最后一次担任经理,您雇用一名员工来完成工作。这次你不愿意等待员工完成未知的时间。你在一个离散的最后期限,如果员工没有按时完成,你不愿意等待。
清单 10
https://play.golang.org/p/6GQbN5Z7vC
01 func withTimeout() {
02 duration := 50 * time.Millisecond
03
04 ctx, cancel := context.WithTimeout(context.Background(), duration)
05 defer cancel()
06
07 ch := make(chan string, 1)
08
09 go func() {
10 time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
11 ch <- "paper"
12 }()
13
14 select {
15 case p := <-ch:
16 fmt.Println("work complete", p)
17
18 case <-ctx.Done():
19 fmt.Println("moving on")
20 }
21 }
在清单 10 的第 02 行,声明了一个持续时间值,它表示员工必须完成任务的时间。该值用于第 04 行以创建context.Context超时为 50 毫秒的值。包中的WithTimeout函数context返回一个Context值和一个取消函数。
该context包创建了一个 goroutine,Context一旦达到持续时间,它将关闭与该值关联的无缓冲通道。cancel无论结果如何,您都有责任调用该函数。这将清理为Context. cancel可以多次调用该函数。
在第 05 行,cancel一旦该函数终止,该函数就会被推迟执行。在第 07 行,创建了一个 1 的缓冲通道,员工将使用该通道向您发送他们的工作结果。然后在第 09 行到第 12 行,雇员被雇用并立即投入工作。您不知道员工需要多长时间才能完成。
在第 14 行到第 20 行之间,您使用该select语句在两个通道上接收。在第 15 行接收,您等待员工向您发送他们的结果。在第 18 行接收,您等待查看context包是否会发出 50 毫秒已到的信号。无论您首先收到哪个信号,都将被处理。
该算法的一个重要方面是使用 1 的缓冲通道。如果员工没有按时完成,您将继续前进,而不会给员工任何通知。从员工的角度来看,他们总是会在第 11 行将报告发送给您,无论您是否在场,他们都视而不见。如果您使用无缓冲通道,如果您继续前进,员工将永远阻止向您发送报告。这会造成 goroutine 泄漏。因此,使用 1 的缓冲通道来防止这种情况发生。
结论
在使用通道(或并发)时,要了解和理解围绕保证、通道状态和发送的信令属性很重要。它们将帮助指导您实现您正在编写的并发程序和算法所需的最佳行为。他们将帮助您找到错误并嗅出潜在的错误代码。
在这篇文章中,我分享了一些示例程序,它们展示了信号的属性如何在不同场景中工作。每条规则都有例外,但这些模式是开始的良好基础。
回顾这些大纲,总结何时以及如何有效地思考和使用渠道
语言力学
-
使用通道来编排和协调 goroutine。
- 关注信令属性而不是数据共享。
- 有数据或没有数据的信令。
- 质疑它们用于同步访问共享状态的用途。
- 在某些情况下,渠道可以更简单,但最初是疑问。
-
无缓冲通道:
- 接收发生在发送之前。
- 优点:100% 保证信号已被接收。
- 成本:接收信号时的未知延迟。
-
缓冲通道:
- 发送发生在接收之前。
- 好处:减少信号之间的阻塞延迟。
- 成本:无法保证何时收到信号。
- 缓冲区越大,保证越少。
- 缓冲区为 1 可以为您提供一次延迟发送保证。
-
关闭渠道:
- 关闭发生在接收之前(如缓冲)。
- 没有数据的信令。
- 完美的信号取消和截止日期。
-
零频道:
- 发送和接收块。
- 关闭信号
- 非常适合限速或短期停工。
设计理念
-
如果通道上的任何给定 Send 可以导致发送 goroutine 阻塞:
- 不允许使用大于 1 的缓冲通道。
- 大于 1 的缓冲区必须有原因/度量。
- 必须知道发送 goroutine 阻塞时会发生什么。
-
如果通道上的任何给定 Send 不会导致发送 goroutine 阻塞:
- 您有每次发送的确切缓冲区数。
- 扇出模式
- 您测量了最大容量的缓冲区。
- 掉落图案
-
缓冲区少即是多。
- 在考虑缓冲区时不要考虑性能。
- 缓冲区可以帮助减少信号之间的阻塞延迟。
- 将阻塞延迟降低到零并不一定意味着更好的吞吐量。
- 如果一个缓冲区为您提供足够好的吞吐量,请保留它。
- 问题缓冲区大于 1 并测量大小。
- 找到可能提供足够好的吞吐量的最小缓冲区。
- 在考虑缓冲区时不要考虑性能。