chan introduction
We can view the assembly results like this.
By doing so above, we can directly see which functions are called by chan.
Source code analysis
Structs and creation
|
|
qcount represents the number of elements in chan that have been received but not yet taken, the function len can return the value of this field.
dataqsiz and buf represent the size of the queue buffer, respectively, and the cap function can return the value of this field and a pointer to the queue buffer, which is a fixed-length torus array.
elemtype and elemsiz represent the type of the element in chan and the size of the element.
sendx: the position in the buffer of the pointer to the sent data.
recvx: the position in the buffer of the pointer to receive the request.
recvq and sendq denote the goroutine waiting to receive data and the goroutine waiting to send data, respectively.
The types of sendq and recvq are structures of waitq.
The waitq is connected inside a sudog bidirectional chain table, which holds the waiting goroutine . The entire chan legend looks like this.
Here’s a look at creating chan, we can also see through the assembly results that make(chan int)
This code will call into the makechan function of runtime.
|
|
First we can see the calculation of hchanSize.
maxAlign is 8, then the binary of maxAlign-1 is 111, then and int(unsafe.Sizeof(hchan{})) to take and is to take its lower three, hchanSize is obtained as an integer multiple of 8, do alignment use.
There are three cases of switch here. The first case is that the required size of the buffer is 0, so when allocating memory for hchan, only memory of size sizeof(hchan) needs to be allocated.
The second case is where the required size of the buffer is not 0 and the data type does not contain a pointer, then allocate contiguous memory. Note that we can specify the type as a pointer type when creating the channel.
The third case is when the required size of the buffer is not 0 and the data type contains a pointer, then instead of using add to keep hchan and buf together, a separate piece of memory is requested for buf.
Sending data
channel blocking non-blocking
Before we look at the code for sending data, let’s look at what is blocking and non-blocking for a channel.
In general, a goroutine that inserts data into a channel blocks until the insertion is successful, when the argument passed is block=true
, i.e., a blocking call.
Non-blocking is only this case.
The compiler will change it to.
The block passed in by the selectnbsend method is false.
chansend method
Sending data to the channel we can see from the assembly result is implemented in runtime by chansend, the method is rather long and we will understand it in sections.
Here a judgment is made on chan, if it is empty, then for non-blocking sends, it returns false directly; for blocking channels, the goroutine is hung and never returned.
|
|
Guaranteed, so after judging closed although it is not closed, then after reading still may be in this instant from the unclosed state to closed state. Then there are two possibilities.
- the channel is not closed and it is full, then it needs to return false, no problem.
- the channel is closed and full, but false is returned in a non-blocking send, also no problem.
These judgments above are called fast path, because the locking operation is a very heavy operation, so the judgment that can be returned before the locking is done is best done before the locking.
Here’s a look at the locking part of the code.
|
|
After entering the lock area, we also need to judge the status of the following close, and then take out a receiver from recvq, if there is already a receiver, then send the current enqueue message to the first receiver. The point to note here is that if there is a receiver waiting in the queue, the buffer is empty at this time.
Since we are analyzing the code line by line, let’s go to send to see the implementation.
|
|
In the send method, sg is the object packed by the goroutine, ep is the pointer to the corresponding data to be sent, and the sendDirect method will call memmove to make a memory copy of the data. Then the goready function will wake up the corresponding goroutine for scheduling.
Go back to the chansend method and continue on.
|
|
Here it will determine if the buf buffer is full, if not, then it will find the index location of the buf to be filled with data, call the typedmemmove method to copy the data to the buf, and then reset the sendx offset.
|
|
For non-blocking calls it returns directly; for blocking calls it creates the sudog object, and then after queuing the sudog object gopark puts the goroutine into the waiting state and unlocks it. After gopark is called, the code statement that sends data to the channel appears to the user to be blocking.
It is also important to note here that if the buffer is 0, then it will also enter here and will call gopark to block immediately, so you need to remember to receive data when using it to prevent the end that sends data to chan from blocking forever, e.g.
|
|
If here in select directly timeout returned without calling result := <-ch
, then goroutine will block forever.
This is the end of the code sent here, the whole process is roughly as follows.
For example, I want to execute: ch<-10
- check if recvq is empty, if not, take a goroutine from the recvq header and send the data there.
- if recvq is empty,, and buf is not full, put the data into buf; if buf is full, put the data into buf
- if buf is full, pack the data to be sent and the current goroutine into sudog, then queue it into sendq queue and block the current goroutine in waiting state.
Receiving data
The function to get data from chan is chanrecv, and we will see the code implementation.
|
|
chanrecv method and chansend method is the same, first also do non-empty judgment, if chan is not initialized, then if non-blocking call, then directly return (false,false), blocking call will be directly wait;.
The following two if judgments I put together to explain, because here and chansend is not the same, chanrecv to return different results according to different conditions need.
The judgment before locking is a boundary condition judgment: if the non-blocking call will determine that chan has no sender (dataqsiz is empty and the send queue is empty), or chan’s buffer is empty (dataqsiz>0 and qcount==0) and chan is not closed, then it needs to return (false,false); while chan is closed and there is no data in buf, it needs to return (true,false).
In order to achieve this requirement, so the boundary conditions inside the chanrecv method are judged using the atomic method to obtain.
Since it is necessary to correctly get that chan is closed and the buf is empty to return (true, false) instead of (false,false), it is necessary to use atomic to get the parameters to prevent reordering (Happens Before) before locking, so the order of the qcount and closed read operations here must be secured by atomic operations order.
|
|
Here if there is a sender waiting in the queue, then the data is fetched directly from the sender and the sender is woken up. Note that since there is a sender waiting, if there is a buffer, then the buffer must be full .
Before waking up the sender, we need to make a judgment on the buffer, if there is no buffer, then we will extract the data from the sender directly; if there is a buffer, we will first get the pointer of recvx, then we will copy the data from the buffer to the receiver, then we will copy the sender data to the buffer.
Then add 1 to recvx, which is equivalent to moving the new data to the end of the queue, then assign the value of recvx to sendx, and finally call goready to wake up the sender, which is a bit roundabout here, so let’s show it through pictures.
What is shown here is that the data is copied into the buffer in chansend, and the pointer to sendx is set to 0 when the data is full, so when the buf ring queue is full sendx equals recvx.
Then see what is done to hand over the data in the buffer when the sender queue in chanrecv has data.
Here the data at recvx 0 will be copied directly from the buffer to the receiver, then the sender will copy the data to the buffer recvx pointer, then add 1 to the recvx pointer and assign recvx to sendx, since it is full so the effect of adding 1 to recvx is used to achieve the operation of adding the newly added data to the end of the queue.
Moving on to the next page.
|
|
At this point, it means there is data in the buffer, but no data in the sender’s queue, then copy the data to the concurrent process of receiving data, then move the pointer of receiving data forward, if it is already at the end of the queue, then start from 0, and finally unlock the existing data in the buffer minus one.
The following is the case when there is no data in the buffer.
|
|
If it is a non-blocking call, it returns directly; a blocking call will wrap the current goroutine into a sudog, then add the sudog to the receive queue and call gopark to block the goroutine and wait to be woken up.
Closing the channel
Closing the channel will call to the closechan method.
|
|
- this method first checks if the chan has been initialized, then adds a lock and then checks if it has been closed, and if the checks all pass, then sets the closed field to a value of 1.
- iterate through all the receivers and senders and add their goroutines to the glist.
- add all the goroutines in the glist to the scheduling queue and wait to be woken up, noting here that the sender will panic after being woken up.
Summary
chan is a very powerful tool in go, it can achieve a lot of functions, but in order to be able to use it efficiently we should also understand how it is implemented inside. This article is a step-by-step analysis to understand how go’s chan is implemented from scratch, and what matters need to be noted in the use of the process, chan’s buf ring queue is how to maintain, I hope it can help you ~