01 channel definition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// /usr/data/go1.17/go/src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
|
The underlying data structure of channel is hchan
, which itself consists of buf
forming a circular linked list, which is a circular queue.
qCount
indicates the length of the data in the channel.
dataqsiz
indicates the size of the ring queue.
elemsize
indicates the occupancy size of the element type and is used to calculate the memory size occupied by the element of the current type.
sendx
and recvx
are used to identify the index in buf
corresponding to the current channel send and receive, respectively.
sendq
and recvq
denote Goroutine queues blocking due to insufficient buffering, respectively, using the Doubly linked list representation. The underlying representation of g here is *sudog
closed
indicates whether channel
has been closed.
lock
is used as a mutually exclusive lock when operations are performed on buf.
02 Initialize channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
...
return c
}
|
The initialization process is the process of initializing hchan
. It calculates the size of the buffer needed and requests the corresponding memory. Create a channel with the following statement.
1
2
3
4
5
6
|
ch := make(chan int,8)
// hchan.dataqsiz = 8
// hchan.elemtype = int
// hchan.elemsiz = 32
// hchan.buf = [8]int{}
|
03 Sending data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// When there are receivers waiting to be consumed, send directly
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// Write data to buffer when buffer is available
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
...
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// When the buffer is insufficient or there is no buffer, it directly waits for other receivers to receive it, when the current sending Goroutine blocks.
...
gp := getg()
mysg := acquireSudog()
...
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
...
}
|
- The channel is used to connect different go routines, so it may be operated by multiple p’s at the same time and there is a data competition problem. Therefore, it is necessary to put a lock on the channel when sending data or receiving data to ensure data security.
- When there is a receiver waiting to be consumed, send directly by calling the
send()
method
- When there is a buffer, write the data to the buffer.
- When the buffer is insufficient or there is no buffer, it directly waits for other receivers to receive it, when the current sending Goroutine blocks.
1
2
|
// runtime/chan.go
typedmemmove(c.elemtype, qp, ep)
|
Sending here means copying data from the original Goroutine memory address to the address of the target receiving Goroutine.
04 Receive data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// If there is a g being sent, receive directly from the sending g
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// If buffer data is available, copy from buffer
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
...
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
...
}
// Direct blocking, waiting for a g to be sent
...
gp := getg()
mysg := acquireSudog()
...
gp.param = nils
c.recvq.enqueue(mysg)
...
}
|
- If there is a g being sent, receive it directly from the sending g.
- If there is buffer data, copy it from the buffer.
- Otherwise block directly and wait for a g to be sent.