channel#
上一章学习了 goroutine 的使用方式。但是单纯的将函数并发执行并没有意义。函数与函数间需要交换数据才能体现并发执行的意义。虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞态问题。为了保证数据交换的正确性,很多并发模型中必须使用互斥量对内存进行加锁,但这种做法势必造成性能问题。
Go语言采用的并发模型是 CSP(Communicating Sequential Processes) 提倡:通过通信共享内存而不是通过共享内存实现通信。
如果把 goroutine 比作是Go程序并发的执行体,channel就是它们之间的连接。channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。
Go语言中的通道 channel 是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是说:声明 channel 时需要为其指定元素类型。
channel 类型#
channel 是 Go 语言中一种特有的类型。声明格式如下:
var 变量名称 chan 元素类型其中:
chan:关键字- 元素类型:是指通道中传递元素的类型
比如:
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道channel 零值#
未初始化的通道类型变量其默认零值是 nil。
var ch chan int
fmt.Println(ch) // <nil>channel 初始化#
声明的通道类型变量需要使用内置的 make 函数初始化之后才能使用。格式如下:
make(chan 元素类型, [缓冲大小])其中:channel 的缓冲大小是可选的。比如:
ch4 := make(chan int)
ch5 := make(chan bool, 1) // 声明一个缓冲区大小为1的通道channel 操作#
通道共有 发送(send)、接收(receive)和 关闭(close) 三种操作。发送和接收操作都使用<-符号。
比如定义一个通道:
ch := make(chan int)发送 send#
将一个值发送到通道中。
ch <- 10 // 把10发送到ch中接收 receive#
从一个通道中接收值。
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果关闭 close#
通过调用内置的close函数来关闭通道。
close(ch)注意:通道值是可以被垃圾回收掉的。通道通常由发送方执行关闭操作,并且只有在接收方明确等待通道关闭的信号时才需要执行关闭操作。它和关闭文件不一样,通常在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
- 对一个关闭的通道再发送值会导致
panic - 对一个关闭的通道进行接收会一直获取值直到通道为空
- 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值
- 关闭一个已经关闭的通道会导致
panic
无缓冲的通道#
无缓冲的通道又称为阻塞的通道。看如下代码片段:
func chan1() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功") // fatal error: all goroutines are asleep - deadlock!
}上面这段代码能够通过编译,但是执行的时候会出现以下错误:
fatal error: all goroutines are asleep - deadlock!deadlock 表示程序中的 goroutine 都被挂起导致程序死锁。导致死锁的原因在于:
ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。同理如果对一个无缓冲通道执行接收操作时,没有任何向通道中发送值的操作那么也会导致接收操作阻塞。简单理解就是:无缓冲的通道必须有至少一个接收方才能发送成功
上面的代码会阻塞在 ch <- 10 这一行代码形成死锁,解决这个问题其中一种可行的方法是创建一个 goroutine 去接收值,例如:
func chan2() {
value := 10
ch := make(chan int)
go func(chan int) {
value := <-ch
fmt.Printf("接收成功,value=%d \n", value)
}(ch)
ch <- value
fmt.Printf("发送成功,value=%d \n", value)
}首先无缓冲通道 ch 上的发送操作会阻塞,直到另一个 goroutine 在该通道上执行接收操作,这时数字10才能发送成功,两个 goroutine 将继续执行。相反,如果接收操作先执行,接收方所在的 goroutine 将阻塞,直到 main goroutine 中向该通道发送数字10。
使用无缓冲通道进行通信将导致发送和接收的 goroutine 同步化。因此无缓冲通道也被称为 同步通道。
有缓冲的通道#
还有另外一种解决上面死锁问题的方法是:使用有缓冲区的通道。可以在使用 make 函数初始化通道时为其指定通道的容量,例如:
func main() {
ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
}只要通道的容量大于零,那么该通道就属于有缓冲的通道,通道的容量表示通道中最大能存放的元素数量。当通道内已有元素数达到最大容量后,再向通道执行发送操作就会阻塞,除非有从通道执行接收操作。可以使用内置的 len 函数获取通道内元素的数量,使用 cap 函数获取通道的容量但很少会这么做。
多返回值模式#
当向通道中发送完数据时可以通过 close 函数来关闭通道。当一个通道被关闭后,再往该通道发送值会引发panic,从该通道取值的操作会先取完通道中的值。通道内的值被接收完后再对通道执行接收操作得到的值会一直都是对应元素类型的零值。那如何判断一个通道是否被关闭呢?
对一个通道执行接收操作时支持使用如下多返回值模式:
value, ok := <- ch其中
value:从通道中取出的值,如果通道被关闭则返回对应类型的零值ok:通道关闭时返回false,否则返回true
循环从通道ch中接收所有值,直到通道被关闭后退出。
func chan4() {
ch := make(chan int, 10)
for i := 0; i < 5; i++ {
wg.Add(1)
ch <- i
}
close(ch)
go func(chan int) {
// 循环接收值,直到通道被关闭后退出
for {
v, ok := <-ch
if !ok {
break
} else {
wg.Done()
fmt.Printf("v=%v ok=%v \n", v, ok)
}
}
}(ch)
wg.Wait()
}for range接收值#
通常会选择使用 for range 循环从通道中接收值,当通道被关闭后会在通道内的所有值被接收完毕后会自动退出循环。上面示例使用 for range 改写后会很简洁。
func chan5() {
ch := make(chan int, 10)
for i := 0; i < 5; i++ {
wg.Add(1)
ch <- i
}
close(ch)
go func() {
for v := range ch {
fmt.Println(v)
wg.Done()
}
}()
wg.Wait()
}注意:目前Go语言中并没有提供一个不对通道进行读取操作就能判断通道是否被关闭的方法。不能简单的通过 len(ch) 操作来判断通道是否被关闭。
单向通道#
某些场景下可能会将通道作为参数在多个任务函数间进行传递,通常会选择在不同的任务函数中对通道的使用进行限制,比如限制通道在某个函数中只能执行发送或只能执行接收操作。比如:现在有 Producer 和 Consumer 两个函数,其中 Producer 函数会返回一个通道,并且会持续将符合条件的数据发送至该通道,并在发送完成后将该通道关闭。而 Consumer 函数的任务是从通道中接收值进行计算,这两个函数之间通过 Processer 函数返回的通道进行通信。示例代码如下:
package main
import (
"fmt"
)
// Producer 返回一个通道
// 并持续将符合条件的数据发送至返回的通道中
// 数据发送完成后会将返回的通道关闭
func Producer() chan int {
ch := make(chan int, 2)
// 创建一个新的goroutine执行发送数据的任务
go func() {
for i := 0; i < 10; i++ {
if i%2 == 1 {
ch <- i
}
}
close(ch) // 任务完成后关闭通道
}()
return ch
}
// Consumer 从通道中接收数据进行计算
func Consumer(ch chan int) int {
sum := 0
for v := range ch {
fmt.Println(v)
sum += v
}
return sum
}
func main() {
ch := Producer()
res := Consumer(ch)
fmt.Println(res) // 25
}上面的示例代码中可以看出正常情况下 Consumer 函数中只会对通道进行接收操作,但这不代表不可以在 Consumer 函数中对通道进行发送操作。作为 Producer 函数的提供者在返回通道的时候可能只希望调用方拿到返回的通道后只能对其进行接收操作。但是没有办法阻止在 Consumer 函数中对通道进行发送操作。比如:
func Producer2() chan int {
ch := make(chan int, 2)
// 创建一个新的goroutine执行发送数据的任务
go func() {
for i := 0; i < 10; i++ {
if i%2 == 1 {
ch <- i
}
}
close(ch) // 任务完成后关闭通道
}()
return ch
}
// Consumer 从通道中接收数据进行计算同时可以向通道中写值
func Consumer2(ch chan int) int {
sum := 0
ch <- 100
for v := range ch {
sum += v
}
return sum
}Go语言中提供了单向通道来处理这种需要限制通道只能进行某种操作的情况。
<- chan int // 只接收通道,只能接收不能发送
chan <- int // 只发送通道,只能发送不能接收其中,箭头 <- 和关键字 chan 的相对位置表明了当前通道允许的操作,这种限制将在编译阶段进行检测。另外对一个只接收通道执行 close 也是不允许的,因为默认通道的关闭操作应该由发送方来完成。
使用单向通道将上面的示例代码进行如下改造。
// Producer3 返回一个接收通道
func Producer3() <-chan int {
ch := make(chan int, 2)
go func() {
for i := 0; i < 10; i++ {
if i%2 == 1 {
ch <- i
}
}
close(ch)
}()
return ch
}
// Consumer3 参数为接收通道
func Consumer3(ch <-chan int) int {
sum := 0
for v := range ch {
sum += v
}
return sum
}Producer3 函数返回的是一个只接收通道,这就从代码层面限制了该函数返回的通道只能进行接收操作,保证了数据安全。并且返回限制操作的单向通道也会让代码语义更清晰、更易读。
在函数传参及任何赋值操作中全向通道(正常通道)可以转换为单向通道,但是无法反向转换。
// channel类型转换
func convertChannel() {
// 双向通道
var ch chan int = make(chan int)
// 单向写
var sendCh chan<- int = make(chan<- int)
// 单向读
var recvCh <-chan int = make(<-chan int)
// 双向channel 可以 隐式转换为 任意一种单向channel
sendCh = ch
recvCh = ch
// 单向 channel 不能转换为 双向 channel
// ch = recvCh // cannot use recvCh (variable of type <-chan int) as chan int value in assignment
// ch = sendCh // cannot use sendCh (variable of type chan<- int) as chan int value in assignment
fmt.Printf("sendCh:%T,recvTh:%T", sendCh, recvCh)
}总结#
表格中总结了对不同状态下的通道执行相应操作的结果。

注意:对已经关闭的通道再执行 close 也会引发 panic。
select 多路复用#
某些场景下可能需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以被接收那么当前 goroutine 将会发生阻塞。可能会写出如下代码尝试使用遍历的方式来实现从多个通道中接收值:
for{
// 尝试从ch1接收值
data, ok := <-ch1
// 尝试从ch2接收值
data, ok := <-ch2
…
}这种方式虽然可以实现从多个通道接收值的需求,但是程序的运行性能比较差。Go 语言内置了select关键字,使用它可以同时响应多个通道的操作。
select 的使用方式类似于 switch 语句,它也有一系列 case 分支和一个默认的分支。每个 case 分支会对应一个通道的通信(接收或发送)过程。select 会一直等待,直到其中的某个 case 的通信操作完成时,就会执行该 case 分支对应的语句。格式如下:
select {
case <-ch1:
//...
case data := <-ch2:
//...
case ch3 <- 10:
//...
default:
//默认操作
}select 语句具有以下特点:
- 可处理一个或多个
channel的发送/接收操作 - 如果多个 case 同时满足,
select会随机选择一个执行 - 对于没有 case 的 select 会一直阻塞,可用于阻塞 main 函数,防止退出
下面示例代码能够在终端打印出10以内的奇数,借助这个代码片段来看一下 select 的具体使用。
package main
import "fmt"
func main() {
ch := make(chan int, 1)
for i := 1; i <= 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}上面的代码输出内容如下。
1
3
5
7
9示例中的代码首先是创建了一个缓冲区大小为1的通道 ch,在进入 for 循环后,此时 i = 1,select 语句中包含两个 case 分支,此时由于通道中没有值可以接收,所以x := <-c 这个 case 分支不满足,而ch <- i这个分支可以执行,会把1发送到通道中,结束本次 for 循环;第二次 for 循环时,i = 2,由于通道缓冲区已满,所以ch <- i这个分支不满足,而x := <-ch这个分支可以执行,从通道接收值1并赋值给变量 x ,所以会在终端打印出 1;后续的 for 循环同理会依次打印出3、5、7、9。
通道误用示例#
接下来,我们将展示两个因误用通道导致程序出现 bug 的代码片段,希望能够加深读者对通道操作的印象。
示例1#
各位读者可以查看以下示例代码,尝试找出其中存在的问题。
// demo1 通道误用导致的bug
func demo1() {
wg := sync.WaitGroup{}
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
wg.Add(3)
for j := 0; j < 3; j++ {
go func() {
for {
task := <-ch
// 这里假设对接收的数据执行某些操作
fmt.Println(task)
}
wg.Done()
}()
}
wg.Wait()
}将上述代码编译执行后,匿名函数所在的 goroutine 并不会按照预期在通道被关闭后退出。因为task := <- ch的接收操作在通道被关闭后会一直接收到零值,而不会退出。此处的接收操作应该使用task, ok := <- ch,通过判断布尔值ok为假时退出;或者使用select 来处理通道。
示例2#
各位读者阅读下方代码片段,尝试找出其中存在的问题。
// demo2 通道误用导致的bug
func demo2() {
ch := make(chan string)
go func() {
// 这里假设执行一些耗时的操作
time.Sleep(3 * time.Second)
ch <- "job result"
}()
select {
case result := <-ch:
fmt.Println(result)
case <-time.After(time.Second): // 较小的超时时间
return
}
}上述代码片段可能导致 goroutine 泄露(goroutine 并未按预期退出并销毁)。由于 select 命中了超时逻辑,导致通道没有消费者(无接收操作),而其定义的通道为无缓冲通道,因此 goroutine 中的ch <- "job result"操作会一直阻塞,最终导致 goroutine 泄露。