Go中并发程序依靠是:goroutine和channel
什么是goroutine?
Goroutine,被普遍认为是协程的go语言实现。《Go语言编程》中说goroutine是轻量级线程(即协程coroutine, 原书90页). 在第九章进阶话题中, 作者又一次提到, “从根本上来说, goroutine就是一种go语言版本的协程(coroutine)” (原书204页). 但作者Rob Pike并不这么说。
“一个Goroutine是一个与其他goroutines并发运行在同一地址空间的Go函数或方法。一个运行的程序由一个或更多个goroutine组成。它与线程、协程、进程等不同。它是一个goroutine。”
1. goroutine vs 协程
是go语言原生支持的,相对于一般由库实现协程的方式,goroutine更加强大,它的调度一定程度上是由go运行时(runtime)管理。其好处之一是,当某goroutine发生阻塞时(例如同步IO操作等),会自动出让CPU给其它goroutine。
2. goroutine vs 线程
一个goroutine并不相当于一个线程,goroutine的出现正是为了替代原来的线程概念成为最小的调度范围,一旦运行goroutine时,先去当前线程查找,如果线程阻塞了,则被分配到空闲的线程,如果没有空闲的线程,那么就会新建一个线程。注意的是,当goroutine执行完毕后,线程不会回收退出,而是成为了空闲的线程。
goroutine的使用
package main
import (
"fmt"
"time"
)
func ready(w string, sec int64) {
time.Sleep(time.Duration(sec * 1e9))
fmt.Println(w, "is ready!")
}
func main() {
go ready("Tee", 2)
go ready("Coffee", 1)
fmt.Println("I'm waiting")
time.Sleep(5 * 1e9)
}
主线程为什么要sleep?
channel的使用
channel是线程之间通信的管道
package main
import (
"fmt"
"time"
)
var c chan int
func ready(w string, sec int) {
time.Sleep(int64(sec) * 1e9)
fmt.Println(w, "is ready!")
c <- 1
}
func main() {
c = make(chan int)
go ready("Tee", 2)
go ready("Coffee", 1)
fmt.Println("I'm waiting, but not too long")
<-c
<-c
}
channel 进一步理解
channel分为两种:有buffer的,没有buffer的 默认的是没有buffer的
c1 := make(chan int)
c2 := make(chan int, 0)
c3 := make(chan int, 100)
有缓冲的channel,注意先“放”后“取”
没有缓冲的channel,注意先“取”后“放”
图解并行编程
1. 单个channel,单个goroutine, 一个写,一个读
package main
func main() {
ch := make(chan int)
go func() {
ch <- 42
}()
<-ch
}
2. 加入定时器的实现
package main
import "time"
func timer(d time.Duration) <-chan int {
c := make(chan int)
go func() {
time.Sleep(d)
c <- 1
}()
return c
}
func main() {
for i :=0; i < 24; i++ {
c := timer(i * time.Second)
<-c
}
}
3.乒乓模式
import main
import "time"
func main() {
var Ball int
table := make(chan int)
go player(table)
go player(table)
table <- Ball
time.Sleep(1 * time.Second)
<-table
}
func player(table chan int) {
for {
ball := <-table
ball++
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
如果有3个人
go player(table)
go player(table)
go player(table)
如果有100个人
for i := 0: i< 100; i++ {
go player(table)
}
goroutines从某个特定的channel接收数据遵循FIFO order
4.Fan-In(扇入)
并发世界的一个流行的编程模式————扇入模式。(相反是扇出模式)
扇入是一个函数读取多个输入和多路复用到单通道。
note:在软件工程中,模块的扇入是指有多少个上级模块调用它。扇入越大,表示该模块被更多的上级模块共享。这当然是我们所希望的。但是不能为了获得高扇入而不惜代价,例如把彼此无关的功能凑在一起构成一个模块,虽然扇入数高了,但这样的模块内聚程度必然低。这是我们应避免的。
package main
import (
"fmt"
"time"
)
func producer(ch chan int, d time.Duration) {
var i int
for {
ch <- i
i++
time.Sleep(d)
}
}
func reader(out chan int) {
for x := range out {
fmt.Println(x)
}
}
func main() {
ch := make(chan int)
out := make(chan int)
go producer(ch, 100*time.Millisecond)
go producer(ch, 100*time.Millisecond)
go reader(out)
for i := range ch {
out <- i
}
}
5. Workers模式(fan-out) 扇出
与扇入相反,多个函数从单一通道读取任务,分发任务到各个cpu核心上。
package main
import (
"fmt"
"sync"
"time"
)
func worker(taskCh <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-taskCh
if !ok {
return
}
d := time.Duration(task) * time.Millisecond
time.Sleep(d)
fmt.Println("processing task", task)
}
}
func pool(wg *sync.WaitGroup, workers, tasks int) {
tasksCh := make(chan int)
for i := 0; i < workers; i++ {
go worker(taskCh, wg)
}
for i := 0; i < tasks; i++ {
taskCh <- i
}
close(tasksCh)
}
func main() {
var wg sync.WaitGroup
wg.add(36)
go poll(&wg, 36, 50)
wg.Wait()
}
更复杂的例子:
package main
import (
"fmt"
"sync"
"time"
)
const (
WORKERS = 5
SUBWORKERS = 3
TASKS = 20
SUBTASKS = 10
)
func subworker(subtasks chan int) {
for {
task, ok := <-subtasks
if !ok {
return
}
time.Sleep(time.Duration(task) * time.Millisecond)
fmt.Println(task)
}
}
func worker(tasks <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasks
if !ok {
return
}
subtasks := make(chan int)
for i := 0; i < SUBWORKERS; i++ {
go subworker(subtasks)
}
for i := 0; i < SUBTASKS; i++ {
task1 := task * i
subtasks <- task1
}
close(subtasks)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(WORKERS)
tasks := make(chan int)
for i := 0; i < WORKERS; i++ {
go worker(tasks, &wg)
}
for i := 0; i < TASKS; i++ {
tasks <- i
}
close(tasks)
wg.Wait()
}
servers模式 (和扇出相类似)
package main
import "net"
func handler(c net.Conn) {
c.Write([] byte("ok"))
c.Close()
}
func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c)
}
}
server模式2(logger)
package main
import (
"fmt"
"net"
"time"
)
func handler(c net.Conn, ch chan string) {
ch <- c.RemoteAddr().String()
c.Write([]byte("ok"))
c.Close()
}
func logger(ch chan string) {
for {
fmt.Println(<-ch)
}
}
func server(l net.Listener, ch chan string) {
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c, ch)
}
}
func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
paic(err)
}
ch := make(chan string)
go logger(ch)
go server(l, ch)
time.Sleep(10 * time.Second)
}
Server + Worker 模式
package main
import (
"net"
"time"
)
func handler(c net.Conn, ch chan string) {
addr := c.RemoteAddr().String()
ch <- addr
time.Sleep(100 * time.Millisecond)
c.Write([]byte("ok"))
c.Close()
}
func logger(wch chan int, results chan int) {
for {
data := <-wch
data++
result <- data
}
}
func parse(results chan int) {
for {
<-results
}
}
func pool(ch chan string, n int) {
wch := make(chan int)
results := make(chan int)
for i := 0; i < n; i++ {
go logger(wch, results)
}
go parse(results)
for {
addr := <-ch
l := len(addr)
wch <- l
}
}
func server(l net.Listener, ch chan string) {
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c, ch)
}
}
func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan string)
go pool(ch, 4)
go server(l, ch)
time.Sleep(10 * time.Second)
}
Concurrent Prime Sieve(素数帅选器)
package main
import "fmt"
func Generate(ch chan<- int) {
for i := 2; ; i++ {
ch <- i
}
}
func Filter(in <-chan int, out chan<- int, prime int) {
for {
i := <-in
if i % prime != 0 {
out <- i
}
}
}
func main() {
ch := make(chan int)
go Generate(ch)
for i := 0; i < 10; i++ {
prime := <-ch
fmt.Println(prime)
ch1 := make(chan int)
go Filter(ch, ch1, prime)
ch = ch1
}
}
Parallelism is not Concurrency
简而言之:
Parallelism is simply running things in parallel.
Concurrency is a way to structure your program.
一图胜千言
parallelism 并行
concurrency 并发
Rob Pike-Concurrency Is Not Parallelism