并发

前言

在学习 Go 的并发之前,先复习一下操作系统的基础知识。

并发与并行

先来理一理并发与并行的区别。

  • 并行:指的是在同一时间,多个程序在不同的 CPU 上共同运行,互相之间并没有对 CPU 资源进行竞争。比如,我在看书的时候,左手用来翻书,右手做笔记,两者可以同时进行。

  • 并发:如果系统只有一个 CPU,有多个程序要运行,系统只能将 CPU 的时间划分为多个时间片,然后分配给不同的程序。比如,我看书的时候,只能用右手翻完书之后,才能腾出手来做笔记。

可是明确的是并发≠并行,但是只要 CPU 运行足够快,每个时间片划分足够小,就会给人们造成一种假象,认为计算机在同一时刻做了多个事情。

进程、线程、协程

进程是一个程序执行的过程,也是系统进行资源分配和调度的基本单位。简单来说,一个进程就是我们电脑上某个独立运行的程序。

而线程是系统能够调度的最小单位,它被包含在进程里面,是进程中的实际运作单位,一个进程可以包含多个线程。可以将进程理解为一个工厂,而工厂里面的工人就是线程。就像工厂里面必须要有一个工人才能工作一样,每个进程里面也必须有一个线程才能工作。比如,JavaScript 就被成为单线程的语言,说明 JavaScript 工厂里面只有一个打工人,这个打工人就是工头,称为主线程。多线程的进程中也会有一个主线程,主线程一般随着进程一起创建和销毁。

进程与线程都是操作系统上的概念,程序中如果要进行进程或者线程的切换,在切换的过程中,需要先保存当线程的状态,然后恢复另一个线程的状态,这是需要耗费时间的,如果是进程的切换还可能跨 CPU,无法利用 CPU 缓存,导致进程比线程的切换成本更加高昂。

所以,除了系统级别的内核线程外,一些程序中创建了用户线程这一说,这么做可以减少与操作系统交互,将线程的切换控制在程序内,这种用户态的线程被称为协程。用户线程的切换完全由程序控制,实际上使用的内核线程就只存在一个,内核线程与用户线程之间的关系为一对多。虽然这样做可以减少线程上下文切换带来的开销,但是,无法避免阻塞的问题。一旦某个用户线程被阻塞会导致内核线程的阻塞,无法进行用户线程进行切换,从而整个进程都被挂起,

协程

Go 语言中的线程模型既不是使用内核线程,也不是完全的用户线程,而是一种混合型的线程模型。用户线程与内核线程的对应关系为多对多,用户线程与内核线程动态关联,当某个线程出现阻塞的时候,可以动态切换到另外的内核线程上。

G-P-M模型

上面只是 Go 语言中抽象层面的线程模型,具体是如何进行线程调度的,还是看看 Go 语言的代码。

func log(msg string) {  fmt.Println(msg) } func main() {  log("hello")  go log("world") }

之前的文章介绍过,Go 程序在运行时,默认以 main 函数为入口,main 函数中运行的代码会到一个 goroutine 中运行。如果我们在调用的函数前,加上一个 go 关键词,那么这个函数就放到另外一个 goroutine 中运行。

这里说的 goroutine 就是 Go 语言中的用户线程,也就是协程。Go 语言在运行时,会建立一个 G-P-M 模型,这个模型专门负责 goroutine 的调度。

  • G:gotoutine(用户线程);

  • P:processor(逻辑处理器);

  • M:machine(机器资源);

每个 goroutine 都会放到一个 goroutine 队列中,由于是用户自主创建,上下文的切换成本极低。P(processor)的主要作用是管理用户线程,将 goroutine 合理的安排到内核线程上,也就是这个模型的 M。通常情况下,G 的数量远远多于 M。

Goroutine

如果你有运行过上面的代码,你会发现,go 关键词后的函数并没有真正执行。

func log(msg string) {  fmt.Println(msg) } func main() {  log("hello")  go log("world") }

运行后,终端只输出了 hello,并没有输出 world。

这是因为 main 函数会在主 goroutine 中运行,类似于主线程,而每个 go 语句会启动一个新的 goroutine,启动后的 goroutine 并不会直接执行,而是会放入一个 G 队列中,等待 P 的分配。但是主 goroutine 结束后,就意味着程序结束了,G 队列中的 goroutine 还没有等到执行时间。所以,go 语句后的函数是一个异步的函数,go 语句调用后,会立即去执行后面的语句,而不会等待 go 语句后的函数执行。

如果要 world 输出,我们可以在 main 函数后面加一个休眠,延长主 goroutine 的执行时间。

import (  "fmt"  "time" ) func log(msg string) {  fmt.Println(msg) } func main() {  fmt.Println()  log("hello")  go log("world")  time.Sleep(time.Millisecond * 500) }

通道

多线程编程中,由于各个线程之间需要共享数据,一般采用的是共享内存的方案。但是这么做,势必会出现多个线程同时修改同一份数据情况,为了保证数据的安全性,需要为数据加锁,处理起来就比较麻烦。

所以在 Go 语言社区有一句名言:

不要通过共享内存来通信,而应该通过通信来共享内存。

创建通道

这里说的通信的方式,就是 Go 语言中的通道(channel)。通道是 Go 语言中的一种特殊类型,需要通过 make 方法创建一个通道。

ch := make(chan int) // 创建一个 int 类型的通道

创建通道的时候,需要加上一个类型,表示该通道传输数据的类型。也可以通过指定一个空接口的方式,创建一个可以传送任意数据的通道。

ch := make(chan interface{})

创建的通道分为无缓存通道和有缓存通道,make 方法的第二个参数表示可缓存的数量(如果传入 0,效果和不传一样)。

ch := make(chan string, 0) // 无缓存通道,传入 ch := make(chan string, 1)

发送和接收数据

通道创建后,通过 <- 符号来接收和发送数据。

ch := make(chan string) ch <- "hello world" // 发送一个字符串 msg := <- ch // 接收之前发送的字符串

实际在这个代码运行的时候,会提示一个错误。

fatal error: all goroutines are asleep - deadlock!

表明当前的 goroutine 处于挂起状态,并且后续不会有响应,只能直接中断程序。因为这里创建的是无缓存通道,发送数据后通道不会将数据缓存在通道中,导致后面要找通道要数据的时候无法正常从通道中获取数据。我们可以将通道的缓存设置为 1,让通道可以缓存一个数据在里面。

ch := make(chan string, 1) ch <- "hello world" // 发送一个字符串 msg := <- ch // 接收之前发送的字符串 fmt.Println(msg)

但是如果发送的数据超出了缓存数量,或者接受数据时,缓存里面已经没有数据了,依然会报错。

ch := make(chan string, 1) ch <- "hello world" ch <- "hello world"  // fatal error: all goroutines are asleep - deadlock!
ch := make(chan string, 1) ch <- "hello world" <- ch <- ch  // fatal error: all goroutines are asleep - deadlock!

协程中使用通道

那么无缓存的通道中,应该怎么发送和接收数据呢?这就需要将通道与协程进行结合,也就是 Go 语言中常用的并发的开发模式。

无缓存的通道在收发数据时,由于一次只能同步的发送一个数据,会在两个 goroutine 间反复横跳,通道在接受数据时,会阻塞当前 goroutine,直到通道在另一个 goroutine 发送了数据。

ch := make(chan string) // 创建一个无缓存通道 temp := "我在地球" go func () {    // 接收一个字符串  ch <- "hello world"  temp = "进入了异次元" }() // 运行到这里会被阻塞 // 直到通道在另一个 goroutine 发送了数据 msg := <- ch fmt.Println(msg) fmt.Println("temp =>", temp)

为了证明通道在接收数据时会被阻塞,我们可以在前面加上一个 temp 变量,然后在另外的 goroutine 中修改这个变量,看最后输出的值是否被修改,以此证明通道在接受数据时是否发生了阻塞。

运行结果已经证明,当通道接收数据时,阻塞了主 goroutine 的执行。除了主动的从通道里面一条条的获取数据,还可以通过 range 的方式循环获取数据。

ch := make(chan string)  go func() {   for i := 0; i < 5; i++ {     ch <- fmt.Sprintf("数据 %d", i)   }   close(ch) }()  for data := range ch {   fmt.Println("接收 =>", data) }

如果使用 range 循环读取通道中的数据时,在数据发送完毕时,需要调用 close(ch) ,将通道关闭。

实战

在了解了前面的基础知识之后,我们可以通过协程 + 通道的写一段爬虫,来实战一下 Go 语言的并发能力。

首先确定爬虫需要爬取的网站,由于个人比较喜欢看电影,所以决定爬一爬豆瓣的电影 TOP 榜单。

其域名为 https://movie.douban.com/top250,翻到第二页后,域名为 https://movie.douban.com/top250?start=25 ,第三页的域名为 https://movie.douban.com/top250?start=50,说明每次这个 TOP 榜单每页会有 25 部电影,每次翻页就给 start 参数加上 25。

const limit = 25 // 每页的数量为 25 const total = 100 // 爬取榜单的前 100 部电影 const page = total / limit // 需要爬取的页数  func main() {  var start int  var url string  for i :=0; i < page; i++ {     start := i * limit     // 计算得到所有的域名     url := "https://movie.douban.com/top250?start=" + strconv.Itoa(start)  } }

然后,我们可以构造一个 fetch 函数,用于请求对应的页面。

func fetch(url string) {   // 构造请求体  req, _ := http.NewRequest("GET", url, nil)   // 由于豆瓣会校验请求的 Header   // 如果没有 User-Agent,http code 会返回 418  req.Header.Add("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36")    // 发送请求  client := &http.Client{}  rsp, _ := client.Do(req)    // 断开连接  defer rsp.Body.Close() }  func main() {  for i :=0; i < page; i++ {     url := ……   go fetch(url, ch)  } }

然后使用 goquery 来解析 HTML,提取电影的排名以及电影名。

// 第二个参数为与主goroutine 沟通的通道 func fetch(url string, ch chan string) {   // 省略部分代码 ……  rsp, _ := client.Do(req)   // 断开连接  defer rsp.Body.Close()   // 解析 HTML  doc, _ := goquery.NewDocumentFromReader(rsp.Body)  // 提取 HTML 中的电影排行与电影名称  doc.Find(".item").Each(func(_ int, s *goquery.Selection) {   num := s.Find(".pic em").Text()   title := s.Find(".title::first-child").Text()     // 将电影排行与名称写入管道中   ch <- fmt.Sprintf("top %s: %s\n", num, title)  }) }

最后,在主 goroutine 中创建通道,以及接收通道中的数据。

func main() {   ch := make(chan string)   for i :=0; i < page; i++ {     url := ……   go fetch(url, ch)  }   for i :=0; i < total; i++ {   top := <- ch // 接收数据   fmt.Println(top)  } }

最后的执行结果如下:

可以看到由于是并发执行,输出的顺序是乱序。

完整代码

package main  import (  "fmt"  "github.com/PuerkitoBio/goquery"  "net/http"  "strconv" )  const limit = 25 const total = 100 const page = total / limit  func fetch(url string, ch chan string) {  req, _ := http.NewRequest("GET", url, nil)  req.Header.Add("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36")   client := &http.Client{}  rsp, _ := client.Do(req)   defer rsp.Body.Close()   doc, _ := goquery.NewDocumentFromReader(rsp.Body)   doc.Find(".item").Each(func(_ int, s *goquery.Selection) {   num := s.Find(".pic em").Text()   title := s.Find("span.title::first-child").Text()   ch <- fmt.Sprintf("top %s: %s\n", num, title)  }) }  func main() {  ch := make(chan string)   for i :=0; i < page; i++ {   start := i * limit   url := "https://movie.douban.com/top250?start=" + strconv.Itoa(start)   go fetch(url, ch)  }   for i :=0; i < total; i++ {   top := <- ch   fmt.Println(top)  } }