浅谈 golang channel

2017-09-12 19:00:00 蒋涛 点融黑帮


    golang(后文简称go)在语言层面提供了goroutine这种并发机制,goroutine底层基于I/O复用并可以有效利用多核CPU,多线程/多进程的应用意味着我们需要处理进程通信,常见的IPC方式包括共享内存和消息传递,两种方式都有各自的应用场景,而这两种方式在go中都有相应的实现,go为前者提供了sync包用于同步操作,为后者实现了channel用于goroutine之间的通信,下面将简单介绍go channel的基本用法。



基本用法


channel 分为无缓冲(unbuffered)和缓冲的(buffered),语法上的区别在于声明时的长度,并且默认是可读可写的,关键字chan后面跟上相应的类型表示仅可读写此类型的值,另外加上箭头表示只读或只写。

    c0 := make(chan int)       // unbuffered
    c1 := make(chan int, 0)    // unbuffered
    c2 := make(chan string, 1) // buffered

    c3 := make(<-chan int) // can only read
    c4 := make(chan<- int) // can only write
    c5 := make(chan int)   // can write and read

使用箭头 <- 来读写一个channel,无缓冲的channel 常用于同步的场景,当向一个无缓冲的channel 发送消息,另一端没有人接收,或者当从一个无缓冲的channel 接收消息,另一端没有人发送时,当前goroutine都会阻塞。下面是一个简单的例子,主goroutine会在接收c的时候阻塞,直到匿名函数中向c写入一个0。

    c := make(chan int)    
   go func(ch chan int) {        ch <- 0    }(c)    <-c

也可以使用range读取一个channel,循环会持续到channel关闭

    c := make(chan int)    
   go func(ch chan int) {        c <- 1        close(c)    }(c)    
   for i := range c {        fmt.Println(i)    }

而缓冲的channel只有当缓冲区满或者空,写入和读取的时候才会发生阻塞,所以缓冲的channel可以当做简单的队列来使用

    c := make(chan int, 10)

    c <- 1
    c <- 2

    fmt.Println(<-c, <-c)

如果以上代码稍作更改,在channel为空时再读取一次,则会发生死锁,代码如下

    c := make(chan int, 10)

    c <- 1
    c <- 2

    fmt.Println(<-c, <-c,<-c)

而运行错误信息为

fatal error: all goroutines are asleep - deadlock!

死锁的原因在于channel的另一端没有goroutine写入了,当缓冲的channel为空时,主goroutine会一直阻塞在读取,同时也没有其他goroutine可以调度。


安全的关闭channel


go提供了一个内置的方法close()用于关闭一个channel,需要注意的是:

  • 只能关闭一个双向或者可写的channel。

  • 对于同一个channel,多次调用close(),会导致panic。

  • 对一个已关闭的channel写数据,会导致panic。

  • 从一个已关闭的channel中读数据,不会panic,会读到channel对应类型的0值,比如int为0,bool就为false,但是这样无法确定读取到的是否是正确的数据,所以一般会使用channel返回的第二个可选参数来判断channel是否关闭。

    c := make(chan int)    
   close(c)    
   if _, ok := <-c; !ok {        fmt.Println("channel closed")    }

鉴于向一个已关闭的channel发送数据会导致panic,所以一般由发送者调用close()关闭channel,因为发送数据的一方清楚什么时候应该停止数据的写入;同时,在channel关闭之后,所有因为读取这个channel而阻塞的goroutine会立即往后执行,利用这一点可以实现简单的广播。

    stopCh := make(chan struct{})    
   for i := 0; i < 5; i++ {        
       go func(i int) {            <-stopCh            fmt.Printf("goroutine %d stopped\n", i)        }(i)    }    
   close(stopCh)    time.Sleep(time.Second * 1)

以上代码在主goroutine中关闭channel,其他goroutine会立即退出

goroutine 2 stopped
goroutine 4 stopped
goroutine 1 stopped
goroutine 0 stopped
goroutine 3 stopped

需要注意的一种情况是只声明未赋值的channel,即 nil channel,close 一个nil channel会导致panic,而读写一个nil channel会永久阻塞。


   Select  


从一个无缓冲的channel读取数据会阻塞,如果需要从多个channel读取数据呢?这个时候就需要配合select关键词使用

select关键字的灵感来源与unix中的I/O多路复用函数select(现已被epoll、kqueue等替代),unix中的select函数监听多个文件描述符,当select返回时,会得到可读或可写的描述符集合,这种技术实现了在一个线程内处理多个套接字连接。

在go中使用select - case 可以在多个channel上监听读写事件,某个case产生了读写事件时,则执行相应case中的代码

下面例子中第二个case从c1读取,c1另一端有一个goroutine写入,所以执行第二个case中的代码

    c1 := make(chan int)
    c2 := make(chan int)    
   go func() {        c1<-1    }()    
   select {    
   case c2 <- 1:        fmt.Println("write to c2")    
   case <-c1:        fmt.Println("read from c1")    }

在实际的应用场景中,需要循环对多个channel中的某一个读写数据,比如服务器编程中,经常使用for - select的方式循环检测多个channel的事件,下面是一个简单的tcp服务的例子。

  • 初始化了两个channel,一个用于读写tcp连接的地址,另一个用于接收系统信号

  • 每建立一个tcp连接,发送连接的远端地址到msg中

  • 在主goroutine中使用for select 循环检测msg和sig的事件,并优先匹配系统信号的事件。

func handleConn(conn net.Conn, msg chan string) {    
   defer conn.Close()    io.WriteString(conn,"hello")    msg <- conn.RemoteAddr().String() }
   
func main() {    msg := make(chan string)    sig := make(chan os.Signal)    signal.Notify(sig, os.Interrupt)    
   go func() {        l, err := net.Listen("tcp", ":8000")        
       if err != nil {            
           panic(err)        }        
       for {            conn, err := l.Accept()            
           if err != nil {                
               if netErr, ok := err.(net.Error); ok && netErr.Temporary() {                    
                   continue                } else {                    
                   panic(err)                }            }            
           go handleConn(conn, msg)        }    }()    
   for {        
   select {        
   case <-sig:            fmt.Println("exit")            os.Exit(0)        
   case addr := <-msg:            fmt.Println(addr)        }    } }


Worker pool


在web开发的时候,可以很容易地使用关键字go将长耗时的任务放到其他goroutine中执行,比如

    http.HandleFunc("/", func(rw http.ResponseWriter, r *http.Request) {        //long time task
        go func() {            //...
        }()

        rw.WriteHeader(http.StatusOK)
    })

但是这样做的缺点也很明显:无法控制并发数量,标准库的http server在一个单独的goroutine中处理每一个用户请求,每个请求又可能会创建1到多个goroutine,如果某一段时间用户请求量很高,会导致服务器在短时间内创建大量的goroutine,在高并发的场景中,这样做是会影响性能的,这个时候,我们希望用一些手段来控制并发数量,比较常见的做法是使用queue+worker pool的方式,这样后台可以任意调整worker的数量来控制任务的处理速度。

以下是一个简单的worker pool,以一个缓冲的channel作为任务队列,用一个函数来表示相应的任务,worker循环从queue中读取并执行相应的任务,直到收到stop的信号。

type Task func()

type Pool struct {    maxWorkers int    queue      chan Task    done       chan struct{} }

func (p Pool) Start() {    
   for i := 0; i < p.maxWorkers; i++ {        worker := Worker{i}        
       go worker.Consume(p.queue, p.done)    } }

func
(p Pool) Stop() {    
   close(p.done) }

type Worker struct{ id int }

func (w Worker) Consume(queue chan Task, stopCh chan struct{}) {    
   for {        
       select {        
       case <-stopCh:            fmt.Printf("worker %d stopped\n", w.id)            
           return        case task := <-queue:            task()        }    } }
func main() {    done := make(chan struct{})    queue := make(chan Task, 100)    
   for i := 0; i < 100; i++ {        i := i                queue <- func() {
           fmt.Println(i)
       } //just print something    }    pool := Pool{        maxWorkers: 5,        queue:      queue,        done:       done,    }    pool.Start()    //stop pool after 5 seconds    time.Sleep(time.Second * 5)    pool.Stop() }


并行运算


go 1.5之后,GOMAXPROCS被默认设置为CPU的核心数量,goroutine会被调度到多个系统线程之上,这意味着我们可以利用多核CPU的性能做一些并行运算,比如:计算10000个文件的MD5值,生成10W个图片的缩略图等等,在这些场景中,我们可以将任务切分为小块,分派个多个goroutine执行,最后通过channel将结果汇合到一起。

下面是一个使用蒙特卡洛法计算PI近似值的一个例子,基本的思想是利用圆的面积与其外接正方形的面积之比为PI/4,通过产生大量均匀分布的二维点,计算落在圆和正方形内的点的比例,再乘以4,就可以得到PI的近似值,随着样本数量的增加,结果会越来越接近PI。

首先是一个单核的版本

func MonteCarloPI(samples int) float64 {
    inside := 0 //indicates point is inside the circle
    r := rand.New(rand.NewSource(time.Now().UnixNano()))    
   for i := 0; i < samples; i++ {        x := r.Float64()        y := r.Float64()        
       if x*x+y*y < 1 {            inside++        }    }    
   return float64(inside) / float64(samples) * 4}
   
func main() {    fmt.Println(MonteCarloPI(10000000))    fmt.Println(MonteCarloPI(100000000))    fmt.Println(math.Pi) }

结果如下,前两个分别为一千万和一亿样本时的结果,最后一行为go标准库math中的PI

3.1417284
3.1416146
3.141592653589793...

当样本数量为1亿时,耗时已经达到了数秒钟。以下例子取CPU核心数作为worker的数量,每个goroutine计算出一个PI的近似值,通过results这个channel汇集goroutine计算的结果,最后求平均值。

func MonteCarloMultiCore(samples int) float64 {
    workers := runtime.NumCPU()
    results := make(chan float64, workers)

    threadSamples := samples / workers    
   for i := 0; i < workers; i++ {        
       go func() {            results<-MonteCarloPI(threadSamples)        }()    }    
   var total float64    for i := 0; i < workers; i++ {        total += <-results    }    
   return total / float64(workers) }

在10亿的样本情况下,单核和多核的结果如下,可以看到提升是很明显的

    MonteCarloPI 27.233842675s
    MonteCarloMultiCore 5.598302862s

更多关于channel的资料,可以参考如下链接

  • Effective go

    (https://golang.org/doc/effective_go.html#concurrency)

  • Go concurrency patterns

    (https://blog.golang.org/pipelines)

  • Concurrency is not parallelism

    (https://blog.golang.org/concurrency-is-not-parallelism)