跳至主要內容

协程并发

Mr.Liu大约 8 分钟Go

协程并发

goroutine

介绍

在 Go 中,协程(goroutine)是一种轻量级的并发执行单位,允许你在同一个程序中同时执行多个函数,而无需显式地创建多个线程。协程是 Go 并发模型的核心组成部分,它们更加高效地利用了系统资源,使并发编程更加简洁和易于管理。

协程的特点和优势:

  1. 轻量级: 协程的创建和销毁开销很小,相比线程,可以创建更多的协程。
  2. 低成本: 在 Go 中,创建协程的成本远低于创建线程。
  3. 并发执行: 协程允许多个函数同时执行,但是它们在一个或多个线程上运行,由 Go 运行时调度管理。
  4. 通信通过共享内存: 协程之间通过共享内存来通信,但是 Go 也提供了通道(channel)作为一种更安全、更强大的通信机制。
  5. 高度抽象: Go 的并发模型使得编写并发程序变得更加简洁和直观。

案例

演示了如何使用协程并发地下载多个网页内容

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
)

func fetchURL(url string, wg *sync.WaitGroup) {
    defer wg.Done() // 通知 WaitGroup,协程已完成

    resp, err := http.Get(url)
    if err != nil {
        fmt.Printf("Error fetching %s: %s\n", url, err)
        return
    }
    defer resp.Body.Close()

    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Printf("Error reading response body: %s\n", err)
        return
    }

    fmt.Printf("Fetched %s, Length: %d\n", url, len(body))
}

func main() {
    urls := []string{
        "https://www.example.com",
        "https://www.google.com",
        "https://www.openai.com",
        "https://www.wikipedia.org",
    }

    var wg sync.WaitGroup

    for _, url := range urls {
        wg.Add(1) // 增加 WaitGroup 计数器
        go fetchURL(url, &wg) // 启动协程
    }

    wg.Wait() // 等待所有协程完成
    fmt.Println("All downloads completed.")
}

在这个示例中,我们使用 sync.WaitGroup​ 来等待所有协程完成。遍历一组网址,并为每个网址启动一个协程。每个协程负责下载一个网页内容,并在完成时通过 Done()​ 方法通知 WaitGroup​。

channel

管道是Golang在语言级别上提供的goroutine间的通讯方式,我们可以使用channel在多个goroutine之间传递消息。如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Golang的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

Go语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

channel类型

channel是一种类型,一种引用类型。声明管道类型的格式如下:

// 声明一个传递整型的管道
var ch1 chan int
// 声明一个传递布尔类型的管道
var ch2 chan bool
// 声明一个传递int切片的管道
var ch3 chan []int

创建channel

声明管道后,需要使用make函数初始化之后才能使用

make(chan 元素类型, 容量)

举例如下:

// 创建一个能存储10个int类型的数据管道
ch1 = make(chan int, 10)
// 创建一个能存储4个bool类型的数据管道
ch2 = make(chan bool, 4)
// 创建一个能存储3个[]int切片类型的管道
ch3 = make(chan []int, 3)

channel操作

管道有发送,接收和关闭的三个功能

发送和接收 都使用 <- 符号

现在我们先使用以下语句定义一个管道:

ch := make(chan int, 3)

发送

将数据放到管道内,将一个值发送到管道内

// 把10发送到ch中
ch <- 10

取操作

x := <- ch

关闭管道.

通过调用内置的close函数来关闭管道

close(ch)

完整示例

// 创建管道
ch := make(chan int, 3)

// 给管道里面存储数据
ch <- 10
ch <- 21
ch <- 32

// 获取管道里面的内容
a := <- ch
fmt.Println("打印出管道的值:", a)
fmt.Println("打印出管道的值:", <- ch)
fmt.Println("打印出管道的值:", <- ch)

// 管道的值、容量、长度
fmt.Printf("地址:%v 容量:%v 长度:%v \n", ch, cap(ch), len(ch))

// 管道的类型
fmt.Printf("%T \n", ch)

// 管道阻塞(当没有数据的时候取,会出现阻塞,同时当管道满了,继续存也会)
<- ch  // 没有数据取,出现阻塞
ch <- 10
ch <- 10
ch <- 10
ch <- 10 // 管道满了,继续存,也出现阻塞

通道的原理:

通道的底层实现使用了 Go 运行时的调度机制,它们是并发安全的数据结构。当协程发送数据到通道时,数据会被复制到通道中,而不是直接传递指针。当协程从通道接收数据时,数据会从通道中移出。Go 运行时确保在合适的时机执行数据的复制和移动,从而避免了竞争条件和并发冲突。

goroutine和channel

package main

import (
    "fmt"
    "math"
    "sync"
)

func putNum(intChan chan int) {
    for i := 2; i <= 120000; i++ {
        intChan <- i
    }
    close(intChan)
}

func primeNum(intChan chan int, primeChan chan int) {
    for value := range intChan {
        var flag = true
        for i := 2; i <= int(math.Sqrt(float64(value))); i++ {
            if value%i == 0 {
                flag = false
                break
            }
        }
        if flag {
            primeChan <- value
        }
    }
}

func printPrime(primeChan chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for value := range primeChan {
        fmt.Println(value)
    }
}

var wg sync.WaitGroup

func main() {
    intChan := make(chan int, 1000)
    primeChan := make(chan int, 1000)

    go putNum(intChan)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go primeNum(intChan, primeChan)
    }

    wg.Add(1)
    go printPrime(primeChan, &wg)

    wg.Wait()
    fmt.Println("主线程执行完毕")
}

单向管道

有时候我们会将管道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中,使用管道都会对其进行限制,比如限制管道在函数中只能发送或者只能接受

默认的管道是 可读可写

// 定义一种可读可写的管道
var ch = make(chan int, 2)
ch <- 10
<- ch

// 管道声明为只写管道,只能够写入,不能读
var ch2 = make(chan<- int, 2)
ch2 <- 10

// 声明一个只读管道
var ch3 = make(<-chan int, 2)
<- ch3

Select多路复用

在某些场景下我们需要同时从多个通道接收数据。这个时候就可以用到golang中给我们提供的select多路复用。 通常情况通道在接收数据时,如果没有数据可以接收将会发生阻塞。

select的使用类似于switch 语句,它有一系列case分支和一个默认的分支。每个case会对应一个管道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。

select {
case <-channel1:
    // 执行 channel1 接收操作
case channel2 <- value:
    // 执行 channel2 发送操作
default:
    // 当没有通道操作可以进行时执行的语句
}

使用select语法模拟一个网络监控程序:

package main

import (
    "fmt"
    "net/http"
    "time"
)

func checkService(serviceName string, url string, statusChan chan<- string) {
    for {
        _, err := http.Get(url)
        if err != nil {
            statusChan <- fmt.Sprintf("[%s] is down", serviceName)
        } else {
            statusChan <- fmt.Sprintf("[%s] is up", serviceName)
        }
        time.Sleep(time.Second * 5)
    }
}

func main() {
    httpStatusChan := make(chan string)
    dbStatusChan := make(chan string)

    go checkService("HTTP Server", "http://localhost:8080", httpStatusChan)
    go checkService("Database Server", "http://localhost:5432", dbStatusChan)

    for {
        select {
        case httpStatus := <-httpStatusChan:
            fmt.Println(httpStatus)
        case dbStatus := <-dbStatusChan:
            fmt.Println(dbStatus)
        }
    }
}

tip:使用select来获取数据的时候,不需要关闭chan,不然会出现问题

Go中的并发安全和锁

在 Go 语言中,并发锁用于控制对共享资源的访问,以避免多个 goroutine 同时修改共享资源而引发的数据竞争和不一致性问题。Go 提供了两种主要类型的并发锁:互斥锁(Mutex)和读写锁(RWMutex)。

互斥锁(Mutex):

互斥锁用于在同一时刻只允许一个 goroutine 访问被保护的资源。它实现了基本的互斥机制,确保任何时刻只有一个 goroutine 可以进入临界区。

工作原理:

当一个 goroutine 需要访问共享资源时,它会尝试获取互斥锁。如果锁当前未被其他 goroutine 持有,那么该 goroutine 将获得锁并可以安全地执行操作。如果锁已经被其他 goroutine 持有,那么当前 goroutine 将被阻塞,直到锁被释放为止。

示例:

下面是一个使用互斥锁的简单示例,展示了如何安全地对共享资源进行操作:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    counter int
    mutex   sync.Mutex
)

func increment() {
    mutex.Lock()
    defer mutex.Unlock()
    counter++
    fmt.Println("Counter:", counter)
}

func main() {
    for i := 0; i < 5; i++ {
        go increment()
    }

    time.Sleep(time.Second)
}

在这个例子中,sync.Mutex​ 用于创建一个互斥锁。多个 goroutine 同时调用 increment​ 函数,但由于使用了互斥锁,每次只有一个 goroutine 能够获得锁并修改 counter​ 变量,从而避免了竞争条件。

读写锁(RWMutex):

读写锁允许多个 goroutine 同时读取共享资源,但只允许一个 goroutine 写入共享资源。这对于读操作远远超过写操作的情况非常有用,因为它可以提高并发性能。

工作原理:

读写锁维护了两个状态:读锁定和写锁定。多个 goroutine 可以同时获取读锁定,但只有一个 goroutine 可以获取写锁定。当写锁定被持有时,不允许获取读锁定,以避免写入和读取同时进行的情况。

示例:

下面是一个使用读写锁的示例,演示了读取和写入共享资源时的情况:

package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    data    map[string]string
    rwMutex sync.RWMutex
)

func readData(key string) {
    rwMutex.RLock()
    defer rwMutex.RUnlock()
    fmt.Println("Reading:", data[key])
}

func writeData(key, value string) {
    rwMutex.Lock()
    defer rwMutex.Unlock()
    data[key] = value
    fmt.Println("Writing:", key, value)
}

func main() {
    data = make(map[string]string)

    go writeData("key1", "value1")

    for i := 0; i < 3; i++ {
        go readData("key1")
    }

    time.Sleep(time.Second)
}

在这个例子中,多个 goroutine 同时读取 "key1" 的值,而在同一时刻只有一个 goroutine 写入数据。使用 sync.RWMutex​ 读写锁,我们可以实现读取的并发性,同时确保写操作的互斥性。