Computer io models are distinguished into a variety of, currently the most used is also nio, epoll, select.
Combining different scenarios with different io models is the right solution.
Network io in golang
golang is naturally suited for concurrency, why? One is the lightweight concurrency, and the other is the abstraction of complex io, which simplifies the process.
For example, if we simply access an http service, a few simple lines of code will do the trick:
1
2
3
4
|
tr := &recordingTransport{}
client := &Client{Transport: tr}
url := "http://dummy.faketld/"
client.Get(url) // Note: doesn't hit network
|
So what optimizations does golang make for Io? What about the ability to achieve such a simple switch?
groutinue Scheduling for io events
We assume here that you already have some knowledge of groutinue scheduling.
We know that in go, each process is bound to a virtual machine, and in the machine, it has a g0 that traverses its own queue locally to get g or gets g from the global queue.
We also know that when g is running, g hands over execution to g0 for rescheduling, so how does g hand back events to g0 in io events? This is when it comes to our main character today —-netpoll.
netpoll
The go language uses the I/O multiplexing model in the network poller to handle I/O operations, but it does not choose the most common system call, select
. select
can also provide I/O multiplexing capabilities, but there are more limitations to its use.
- Limited listening capability - can only listen to a maximum of 1024 file descriptors, which can be changed by manually modifying the limit, but at a relatively high cost in all respects.
- High memory copy overhead - a larger data structure needs to be maintained to store the file descriptors, which needs to be copied to the kernel.
- time complexity - after returning the number of ready events, all file descriptors need to be traversed.
golang officially encapsulates a network event poll in a unified way, independent of platform, providing a specific implementation for epoll/kqueue/port/AIX/Windows.
src/runtime/netpoll_epoll.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_windows.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_fake.go
These modules, which implement the same functionality on different platforms, form a common tree structure. When the compiler compiles a Go language program, it selects specific branches in the tree for compilation based on the target platform
The methods that must be implemented are
1
2
3
4
5
6
7
8
|
netpollinit 初始化网络轮询器,通过 `sync.Once` 和 `netpollInited` 变量保证函数只会调用一次
netpollopen 监听文件描述符上的边缘触发事件,创建事件并加入监听poll_runtime_pollOpen函数,这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联。
netpoll 轮询网络并返回一组已经准备就绪的 Goroutine,传入的参数会决定它的行为:
- 如果参数小于0,阻塞等待文件就绪
- 如果参数等于0,非阻塞轮询
- 如果参数大于0,阻塞定期轮询
netpollBreak 唤醒网络轮询器,例如:计时器向前修改时间时会通过该函数中断网络轮询器
netpollIsPollDescriptor 判断文件描述符是否被轮询器使用
|
There are 2 important structures in netpoll.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
//pollCache
//pollDesc
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descript
|
rseq
and wseq
- indicate that the file descriptor is reused or the timer is reset.
rg
and wg
- indicate binary semaphores, possibly pdReady
, pdWait
, goroutine waiting for the file descriptor to become readable or writable, and nil
.
rd
and wd
- deadlines for waiting for file descriptors to become readable or writable.
rt
and wt
- timers for waiting for file descriptors.
golang on io time to do a lot of unified encapsulation under runtime/netpoll (actually called under the internal/poll package), and then through the internal package to the runtime package to call, the internal package also encapsulates a pollDesc object of the same name, but is a pointer (there is a detail about the internal is that this package can not be called externally).
1
2
3
|
type pollDesc struct {
runtimeCtx uintptr
}
|
In fact, it is ultimately a call under the runtime, but encapsulates some easy-to-use methods, such as read, write, and do some abstraction.
1
2
3
4
5
6
7
8
9
10
11
12
|
func runtime_pollServerInit() //初始化
func runtime_pollOpen(fd uintptr) (uintptr, int) //打开
func runtime_pollClose(ctx uintptr) //关闭
func runtime_pollWait(ctx uintptr, mode int) int //等待
func runtime_pollWaitCanceled(ctx uintptr, mode int) int //等待并(失败时)退出
func runtime_pollReset(ctx uintptr, mode int) int //重置状态,复用
func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) //设置读/写超时时间
func runtime_pollUnblock(ctx uintptr) // 解锁
func runtime_isPollServerDescriptor(fd uintptr) bool
// 这里的ctx实际上是一个io fd,不是上下文
// mod 是 r 或者 w ,io事件毕竟只有有这两种
// d 意义和time.d差不多,就是关于时间的
|
The specific implementation of these methods is under runtime, so let’s pick a few important ones.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
//将就绪好得io事件,写入就绪的grotion对列
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This may run while the world is stopped, so write barriers are not allowed.
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
//轮询时调用的方法,如果io就绪了返回ok,如果没就绪,返回flase
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == 0 {
//gopark是很重要得一个方法,本质上是让出当前协程执行权,一般是返回到g0让g0重新调度
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
//获取到当前io所在的协程,如果协程已关闭,直接返回nil
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set pdReady for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if atomic.Casuintptr(gpp, old, new) {
if old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
|
Think about.
- a, b two co-processes, b io blocking, finished, has not obtained the scheduling rights, what will happen.
- a, b two co-processes, b io blocking, 2s time out, but a has been occupying the execution rights, b has not obtained the scheduling rights, 5s before obtaining to, b to the use of the end has timed out, this time is time out or not time out
So set the time out, not necessarily the real io waiting, may not get the right to execute.
How is the read event triggered?
Because write io is an active operation for us, how does read perform the operation? This is a passive state
First we understand a structure. golang identifies all network events and file reads and writes with fd (located under the internal package).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
|
We see that the pollDesc associated with the fd calls the various platform io events implemented inside the runtime package through the pollDesc.
When we do a read operation (here is the code capture)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
|
Will block the call waiteRead method, the method is mainly called inside the runtime_pollWait.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
|
Here is mainly controlled by netpollblock,netpollblock method we have said above, when the io is not yet ready, directly release the current execution rights, otherwise it is already read and write the io event, directly read the operation can be.
Summary
Overall flow listenStream -> bind&listen&init -> pollDesc.Init -> poll_runtime_pollOpen -> runtime.netpollopen - > epollctl(EPOLL_CTL_ADD)
Draw a diagram to understand more easily, of course, I was lazy to find the diagram
When g io events are encountered in golang, they are encapsulated in a unified way, first establishing a system event (this article focuses on epoll), then giving up the cpu (gopark), and then executing other g’s in a concurrent scheduling process. when the g io event is completed, it will interact with epoll to see if it is ready (epoll ready list), and if it is ready, pop will take out a g and execute it down the line. (Actually, there is some logic in the pop ready list, such as delayed processing)
runtime/proc.go.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.
top:
_p_ := _g_.m.p.ptr()
//......
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
//......
}
|
Also in sysmon, netpoll is scheduled.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
lock(&sched.lock)
sched.nmsys++
checkdead()
unlock(&sched.lock)
//......
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
//......
}
|
epoll
epoll is a separate thread maintained by the system kernel, not by go itself
Constants
FD_CLOEXEC is used to set the close-on-exec status criteria of the file. This, emm, is quite difficult to understand.
gc
pollDesc is maintained by pollCache and is not monitored by GC (persistentalloc method allocation), so in the normal case about io operations, we must perform a manual shutdown to clean up the reference objects in epoll (specific implementation in poll_runtime_Semrelease).
1
2
3
4
5
6
7
8
|
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
|
sysmon
Go’s standard library provides a thread to monitor your application and help you (find) any bottlenecks your application may encounter. This thread is called sysmon, the system monitor. In the GMP model, this (special) thread is not linked to any P, which means that the scheduler does not take it into account, and is therefore always running.
The sysmon thread has a wide range of roles, mainly in the following areas:
- Timers (timers) created by the application. The sysmon thread looks at timers that should be running but are still waiting for execution time. In this case, Go will look at the list of idle M and P timers so that it can run them as fast as possible.
- Network pollers and system calls. It will run goroutines that are blocked during network operations.
- Garbage collector (if it has not been running for a long time). If the garbage collector has not run for two minutes, sysmon will force a round of garbage collection (GC).
- Preemption of long-running goroutines. Any goroutine that runs for more than 10 milliseconds will be preempted, leaving the running time for other goroutines.