Lain's Blog

Go语言--并发篇

Go中并发程序依靠是:goroutine和channel

什么是goroutine?

Goroutine,被普遍认为是协程的go语言实现。《Go语言编程》中说goroutine是轻量级线程(即协程coroutine, 原书90页). 在第九章进阶话题中, 作者又一次提到, “从根本上来说, goroutine就是一种go语言版本的协程(coroutine)” (原书204页). 但作者Rob Pike并不这么说。

“一个Goroutine是一个与其他goroutines并发运行在同一地址空间的Go函数或方法。一个运行的程序由一个或更多个goroutine组成。它与线程、协程、进程等不同。它是一个goroutine。”

1. goroutine vs 协程

是go语言原生支持的,相对于一般由库实现协程的方式,goroutine更加强大,它的调度一定程度上是由go运行时(runtime)管理。其好处之一是,当某goroutine发生阻塞时(例如同步IO操作等),会自动出让CPU给其它goroutine。

2. goroutine vs 线程

一个goroutine并不相当于一个线程,goroutine的出现正是为了替代原来的线程概念成为最小的调度范围,一旦运行goroutine时,先去当前线程查找,如果线程阻塞了,则被分配到空闲的线程,如果没有空闲的线程,那么就会新建一个线程。注意的是,当goroutine执行完毕后,线程不会回收退出,而是成为了空闲的线程。

goroutine的使用

package main

import (
    "fmt"
    "time"
)

func ready(w string, sec int64) {
    time.Sleep(time.Duration(sec * 1e9))
    fmt.Println(w, "is ready!")
}

func main() {
    go ready("Tee", 2)
    go ready("Coffee", 1)
    fmt.Println("I'm waiting")
    time.Sleep(5 * 1e9)
}

主线程为什么要sleep?

channel的使用

channel是线程之间通信的管道

package main

import (
    "fmt"
    "time"
)

var c chan int

func ready(w string, sec int) {
    time.Sleep(int64(sec) * 1e9)
    fmt.Println(w, "is ready!")
    c <- 1
}

func main() {
    c = make(chan int)
    go ready("Tee", 2)
    go ready("Coffee", 1)
    fmt.Println("I'm waiting, but not too long")
    <-c
    <-c
}

channel 进一步理解

channel分为两种:有buffer的,没有buffer的 默认的是没有buffer的

c1 := make(chan int)
c2 := make(chan int, 0)
c3 := make(chan int, 100)

有缓冲的channel,注意先“放”后“取”
没有缓冲的channel,注意先“取”后“放”

图解并行编程

1. 单个channel,单个goroutine, 一个写,一个读

package main

func main() {
    ch := make(chan int)

    go func() {
        ch <- 42
    }()

    <-ch
} 

hello

2. 加入定时器的实现

package main

import "time"

func timer(d time.Duration) <-chan int {
    c := make(chan int)
    go func() {
        time.Sleep(d)
        c <- 1 
    }()
    return c 
}

func main() {
    for i :=0; i < 24; i++ {
        c := timer(i * time.Second)
        <-c
    }
}

timers

3.乒乓模式

import main

import "time"

func main() {
    var Ball int
    table := make(chan int)
    go player(table)
    go player(table)

    table <- Ball
    time.Sleep(1 * time.Second)
    <-table
}

func player(table chan int) {
    for {
        ball := <-table
        ball++
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

pingpong

如果有3个人

go player(table)
go player(table)
go player(table)

pingpong3

如果有100个人

for i := 0: i< 100; i++ {
    go player(table)
}

pingpong100

goroutines从某个特定的channel接收数据遵循FIFO order

4.Fan-In(扇入)

并发世界的一个流行的编程模式————扇入模式。(相反是扇出模式)
扇入是一个函数读取多个输入和多路复用到单通道。

note:在软件工程中,模块的扇入是指有多少个上级模块调用它。扇入越大,表示该模块被更多的上级模块共享。这当然是我们所希望的。但是不能为了获得高扇入而不惜代价,例如把彼此无关的功能凑在一起构成一个模块,虽然扇入数高了,但这样的模块内聚程度必然低。这是我们应避免的。

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration) {
    var i int
    for {
        ch <- i
        i++
        time.Sleep(d)
    }
}

func reader(out chan int) {
    for x := range out {
        fmt.Println(x)
    }
}

func main() {
    ch := make(chan int)
    out := make(chan int)
    go producer(ch, 100*time.Millisecond)
    go producer(ch, 100*time.Millisecond)
    go reader(out)
    for i := range ch {
        out <- i
    } 
}

fanin

5. Workers模式(fan-out) 扇出

与扇入相反,多个函数从单一通道读取任务,分发任务到各个cpu核心上。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(taskCh <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-taskCh
        if !ok {
            return 
        }
        d := time.Duration(task) * time.Millisecond
        time.Sleep(d)
        fmt.Println("processing task", task)
    }
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
    tasksCh := make(chan int)

    for i := 0; i < workers; i++ {
        go worker(taskCh, wg)
    }

    for i := 0; i < tasks; i++ {
        taskCh <- i
    }

    close(tasksCh)
}

func main() {
    var wg sync.WaitGroup
    wg.add(36)
    go poll(&wg, 36, 50)
    wg.Wait()
}

workers

更复杂的例子:

package main

import (
    "fmt"
    "sync"
    "time"
)

const (
    WORKERS     = 5
    SUBWORKERS  = 3
    TASKS       = 20
    SUBTASKS    = 10
)

func subworker(subtasks chan int) {
    for {
        task, ok := <-subtasks
        if !ok {
            return
        }
        time.Sleep(time.Duration(task) * time.Millisecond)
        fmt.Println(task)
    }
}

func worker(tasks <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasks
        if !ok {
            return
        }

        subtasks := make(chan int)
        for i := 0; i < SUBWORKERS; i++ {
            go subworker(subtasks)
        }
        for i := 0; i < SUBTASKS; i++ {
            task1 := task * i
            subtasks <- task1
        }
        close(subtasks)
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(WORKERS)
    tasks := make(chan int)

    for i := 0; i < WORKERS; i++ {
        go worker(tasks, &wg)
    }

    for i := 0; i < TASKS; i++ {
        tasks <- i
    }

    close(tasks)
    wg.Wait()
}

enter description here

servers模式 (和扇出相类似)

package main

import "net"

func handler(c net.Conn) {
    c.Write([] byte("ok"))
    c.Close()
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c)
    }
}

servers

Simplicity is complicated

server模式2(logger)

package main

import (
    "fmt"
    "net"
    "time"
)

func handler(c net.Conn, ch chan string) {
    ch <- c.RemoteAddr().String()
    c.Write([]byte("ok"))
    c.Close()
}

func logger(ch chan string) {
    for {
        fmt.Println(<-ch)
    }
}

func server(l net.Listener, ch chan string) {
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c, ch)
    }
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        paic(err)
    }
    ch := make(chan string)
    go logger(ch)
    go server(l, ch)
    time.Sleep(10 * time.Second)
}

enter description here

Server + Worker 模式

package main

import (
    "net"
    "time"
)

func handler(c net.Conn, ch chan string) {
    addr := c.RemoteAddr().String()
    ch <- addr
    time.Sleep(100 * time.Millisecond)
    c.Write([]byte("ok"))
    c.Close()
}

func logger(wch chan int, results chan int) {
    for {
        data := <-wch
        data++
        result <- data
    }
}

func parse(results chan int) {
    for {
        <-results
    }
}

func pool(ch chan string, n int) {
    wch := make(chan int)
    results := make(chan int)
    for i := 0; i < n; i++ {
        go logger(wch, results)
    }
    go parse(results)
    for {
        addr := <-ch 
        l := len(addr)
        wch <- l
    }
}

func server(l net.Listener, ch chan string) {
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c, ch)
    }
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    ch := make(chan string)
    go pool(ch, 4)
    go server(l, ch)
    time.Sleep(10 * time.Second)
}

servers3

Concurrent Prime Sieve(素数帅选器)

package main

import "fmt"

func Generate(ch chan<- int) {
    for i := 2; ; i++ {
        ch <- i
    }
}

func Filter(in <-chan int, out chan<- int, prime int) {
    for {
        i := <-in
        if i % prime != 0 {
            out <- i
        }
    }
}

func main() {
    ch := make(chan int)
    go Generate(ch)
    for i := 0; i < 10; i++ {
        prime := <-ch
        fmt.Println(prime)
        ch1 := make(chan int)
        go Filter(ch, ch1, prime)
        ch = ch1
    }
}

prime sieve

Parallelism is not Concurrency

简而言之:

Parallelism is simply running things in parallel.
Concurrency is a way to structure your program.

一图胜千言

parallelism 并行

parallelism1

parallelism2

concurrency 并发

concurrency1

concurrency2

concurrency3

Rob Pike-Concurrency Is Not Parallelism

扫二维码
扫一扫,用手机访问本站

扫一扫,用手机访问本站