For a non-stop restart of a Go program we need to address two issues.
- the process restart does not need to close the listening port.
- pre-existing requests should be fully processed or timeout.
We’ll see how endless does both of these things later.
Basic concepts
The following is a brief introduction to two knowledge points for the content that follows.
Signal handling
Go signal notification works by sending the os.Signal value on the Channel. If we use Ctrl+C
, for example, then a SIGINT signal will be triggered and the OS will interrupt the normal flow of the process and go to the appropriate signal handling function to execute the operation and then return to the interrupted place to continue execution when it is finished.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func main() {
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
// 监听信号
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
// 接收到信号返回
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
// 等待信号的接收
<-done
fmt.Println("exiting")
}
|
With a few lines of code, we can listen for SIGINT and SIGTERM signals. When Go receives a signal from the OS, it puts the signal value into the sigs pipeline for processing.
Fork subprocesses
The exec package in Go wraps the Fork call nicely for us, and it allows us to use ExtraFiles
to inherit open files from the parent process nicely.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
file := netListener.File() // this returns a Dup()
path := "/path/to/executable"
args := []string{
"-graceful"}
// 产生 Cmd 实例
cmd := exec.Command(path, args...)
// 标准输出
cmd.Stdout = os.Stdout
// 标准错误输出
cmd.Stderr = os.Stderr
cmd.ExtraFiles = []*os.File{file}
// 启动命令
err := cmd.Start()
if err != nil {
log.Fatalf("gracefulRestart: Failed to launch, error: %v", err)
}
|
The Cmd instance is returned by calling the Command command of the exec package, passing path (the path of the command to be executed), args (the arguments to the command), specifying additional open files to be inherited by the new process via the ExtraFiles field, and finally calling the Start method to create the child process.
Here netListener.File
makes a copy of the file descriptor with the system call dup.
1
2
3
4
5
6
7
8
|
func Dup(oldfd int) (fd int, err error) {
r0, _, e1 := Syscall(SYS_DUP, uintptr(oldfd), 0, 0)
fd = int(r0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}
|
We can see the introduction to the dup command
1
2
3
4
5
6
7
|
dup and dup2 create a copy of the file descriptor oldfd.
After successful return of dup or dup2, the old and new descriptors may
be used interchangeably. They share locks, file position pointers and
flags; for example, if the file position is modified by using lseek on
one of the descriptors, the position is also changed for the other.
The two descriptors do not share the close-on-exec flag, however.
|
As you can see from the above description, the new file descriptor returned refers to the same file as oldfd, sharing all the claims, read and write pointers, permissions or flags, etc. But it does not share the close flag bit, i.e. it does not affect the writing of new data to oldfd even if it is already closed. However, the close flag bit is not shared, which means that oldfd is already closed and does not affect the writing of new data to newfd.
The above diagram shows forking a child process which copies the file descriptor table of the parent process.
endless restart example
I’ll write a little bit about endless for those who haven’t used endless before, but you can skip it if you are familiar with it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import (
"log"
"net/http"
"os"
"sync"
"time"
"github.com/fvbock/endless"
"github.com/gorilla/mux"
)
func handler(w http.ResponseWriter, r *http.Request) {
duration, err := time.ParseDuration(r.FormValue("duration"))
if err != nil {
http.Error(w, err.Error(), 400)
return
}
time.Sleep(duration)
w.Write([]byte("Hello World"))
}
func main() {
mux1 := mux.NewRouter()
mux1.HandleFunc("/sleep", handler)
w := sync.WaitGroup{}
w.Add(1)
go func() {
err := endless.ListenAndServe("127.0.0.1:5003", mux1)
if err != nil {
log.Println(err)
}
log.Println("Server on 5003 stopped")
w.Done()
}()
w.Wait()
log.Println("All servers stopped. Exiting.")
os.Exit(0)
}
|
Verify the non-stop service created by endless below
1
2
3
4
5
6
7
8
9
10
11
12
|
# 第一次构建项目
go build main.go
# 运行项目,这时就可以做内容修改了
./endless &
# 请求项目,60s后返回
curl "http://127.0.0.1:5003/sleep?duration=60s" &
# 再次构建项目,这里是新内容
go build main.go
# 重启,17171为pid
kill -1 17171
# 新API请求
curl "http://127.0.0.1:5003/sleep?duration=1s"
|
After running the above command we can see that for the first request the return value is: Hello world
and before sending the second request I change the return value in the handler to: Hello world2222
and then do a build restart.
Since I set the first request to return in 60s and the second request to return in 1s, the value of the second request is returned first and then the value of the first request is returned.
The whole timeline is shown below.
And while waiting for the first request to be returned, you can see that two processes are running at the same time.
1
2
3
|
$ ps -ef |grep main
root 84636 80539 0 22:25 pts/2 00:00:00 ./main
root 85423 84636 0 22:26 pts/2 00:00:00 ./main
|
After the first response to the request, we can look at the process and see that the parent process has been shut down, allowing a seamless switch between parent and child processes.
Implementation Principles
For the implementation, I am using the endless implementation here, so the following principles and code are explained through its code.
The principle of the non-stop reboot we want to do is shown in the figure above.
- listen for the SIGHUP signal.
- fork the child process when it receives the signal (using the same start command) and pass the socket file descriptor that the service is listening to to the child process.
- the child process listens to the socket of the parent process, at which point both the parent and child processes can receive requests.
- the child process sends a SIGTERM signal to the parent process after a successful start, which stops receiving new connections and waits for the old connection to be processed (or times out).
- the parent process exits and the upgrade is complete.
Code Implementation
As we can see from the example above, the entry point for endless is the ListenAndServe function.
1
2
3
4
5
6
|
func ListenAndServe(addr string, handler http.Handler) error {
// 初始化 server
server := NewServer(addr, handler)
// 监听以及处理请求
return server.ListenAndServe()
}
|
This method is divided into two parts, first initialising the server, then listening and processing the request.
Initialising the Server
Let’s first look at what an endless service’s Server structure looks like.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
type endlessServer struct {
// 用于继承 http.Server 结构
http.Server
// 监听客户端请求的 Listener
EndlessListener net.Listener
// 用于记录还有多少客户端请求没有完成
wg sync.WaitGroup
// 用于接收信号的管道
sigChan chan os.Signal
// 用于重启时标志本进程是否是为一个新进程
isChild bool
// 当前进程的状态
state uint8
...
}
|
In addition to inheriting all the fields of http.Server, this endlessServer adds several status fields because it needs to listen for signals and determine if it is a new process.
- wg: marks how many client requests are still outstanding.
- sigChan: the pipeline used to receive the signal.
- isChild: used to flag if this process is a new process when restarting.
- state: the state of the current process.
Let’s see how to initialize endlessServer.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
runningServerReg.Lock()
defer runningServerReg.Unlock()
socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")
// 根据环境变量判断是不是子进程
isChild = os.Getenv("ENDLESS_CONTINUE") != ""
// 由于支持多 server,所以这里需要设置一下 server 的顺序
if len(socketOrder) > 0 {
for i, addr := range strings.Split(socketOrder, ",") {
socketPtrOffsetMap[addr] = uint(i)
}
} else {
socketPtrOffsetMap[addr] = uint(len(runningServersOrder))
}
srv = &endlessServer{
wg: sync.WaitGroup{},
sigChan: make(chan os.Signal),
isChild: isChild,
...
state: STATE_INIT,
lock: &sync.RWMutex{},
}
srv.Server.Addr = addr
srv.Server.ReadTimeout = DefaultReadTimeOut
srv.Server.WriteTimeout = DefaultWriteTimeOut
srv.Server.MaxHeaderBytes = DefaultMaxHeaderBytes
srv.Server.Handler = handler
runningServers[addr] = srv
...
return
}
|
The initialisation here is all the usual parameters we see in net/http
, including the ReadTimeout read timeout, WriteTimeout write timeout, Handler request handler, etc.
Note that the ENDLESS_CONTINUE
environment variable is used to determine whether a process is a child, and is written to when the fork child is run. Since endless supports multiple servers, the ENDLESS_SOCKET_ORDER
variable is used to determine the order of the servers.
ListenAndServe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (srv *endlessServer) ListenAndServe() (err error) {
addr := srv.Addr
if addr == "" {
addr = ":http"
}
// 异步处理信号量
go srv.handleSignals()
// 获取端口监听
l, err := srv.getListener(addr)
if err != nil {
log.Println(err)
return
}
// 将监听转为 endlessListener
srv.EndlessListener = newEndlessListener(l, srv)
// 如果是子进程,那么发送 SIGTERM 信号给父进程
if srv.isChild {
syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
}
srv.BeforeBegin(srv.Addr)
// 响应Listener监听,执行对应请求逻辑
return srv.Serve()
}
|
This method is actually quite similar to the net/http
library in that it first gets the port to listen on and then calls Serve to process the data that is sent to it.
However, there are a few differences: endless needs to use signal listening in order to achieve a smooth restart, and it is different when it comes to getListener, as the child process needs to inherit the listen fd from the parent process in order to not close the listening port.
handleSignals Signal Handling
Signal processing is mainly a listening for signals, which are then cycled according to the different signals.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func (srv *endlessServer) handleSignals() {
var sig os.Signal
// 注册信号监听
signal.Notify(
srv.sigChan,
hookableSignals...,
)
// 获取pid
pid := syscall.Getpid()
for {
sig = <-srv.sigChan
// 在处理信号之前触发hook
srv.signalHooks(PRE_SIGNAL, sig)
switch sig {
// 接收到平滑重启信号
case syscall.SIGHUP:
log.Println(pid, "Received SIGHUP. forking.")
err := srv.fork()
if err != nil {
log.Println("Fork err:", err)
}
// 停机信号
case syscall.SIGINT:
log.Println(pid, "Received SIGINT.")
srv.shutdown()
// 停机信号
case syscall.SIGTERM:
log.Println(pid, "Received SIGTERM.")
srv.shutdown()
...
// 在处理信号之后触发hook
srv.signalHooks(POST_SIGNAL, sig)
}
}
|
The code in this section is very simple, when we use kill -1 $pid
here srv.sigChan
receives the appropriate signal and goes into the case syscall.SIGHUP
piece of logic.
Note that the syscall.SIGTERM
signal sent by the child process to the parent process in the ListenAndServe method above is also handled here, and the shutdown logic is executed.
After getting to the case syscall.SIGHUP
piece of logic, the fork function is called, and we will look at the fork logic again.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
func (srv *endlessServer) fork() (err error) {
runningServerReg.Lock()
defer runningServerReg.Unlock()
// 校验是否已经fork过
if runningServersForked {
return errors.New("Another process already forked. Ignoring this one.")
}
runningServersForked = true
var files = make([]*os.File, len(runningServers))
var orderArgs = make([]string, len(runningServers))
// 因为有多 server 的情况,所以获取所有 listen fd
for _, srvPtr := range runningServers {
switch srvPtr.EndlessListener.(type) {
case *endlessListener:
files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()
default:
files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()
}
orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
}
// 环境变量
env := append(
os.Environ(),
// 启动endless 的时候,会根据这个参数来判断是否是子进程
"ENDLESS_CONTINUE=1",
)
if len(runningServers) > 1 {
env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))
}
// 程序运行路径
path := os.Args[0]
var args []string
// 参数
if len(os.Args) > 1 {
args = os.Args[1:]
}
cmd := exec.Command(path, args...)
// 标准输出
cmd.Stdout = os.Stdout
// 错误
cmd.Stderr = os.Stderr
cmd.ExtraFiles = files
cmd.Env = env
err = cmd.Start()
if err != nil {
log.Fatalf("Restart: Failed to launch, error: %v", err)
}
return
}
|
The fork code first fetches the different listen fd’s according to the server and wraps them in a list of files, then passes the file descriptors into the ExtraFiles parameter when calling cmd, so that the process can be seamlessly hosted on the port the parent is listening on.
Note that there is an ENDLESS_CONTINUE parameter in the env argument list which is checked when endless is started.
1
2
3
4
5
6
7
8
|
func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
runningServerReg.Lock()
defer runningServerReg.Unlock()
socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")
isChild = os.Getenv("ENDLESS_CONTINUE") != ""
...
}
|
Let’s see what shutdown does when it receives a SIGTERM signal.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func (srv *endlessServer) shutdown() {
if srv.getState() != STATE_RUNNING {
return
}
srv.setState(STATE_SHUTTING_DOWN)
// 默认 DefaultHammerTime 为 60秒
if DefaultHammerTime >= 0 {
go srv.hammerTime(DefaultHammerTime)
}
// 关闭存活的连接
srv.SetKeepAlivesEnabled(false)
err := srv.EndlessListener.Close()
if err != nil {
log.Println(syscall.Getpid(), "Listener.Close() error:", err)
} else {
log.Println(syscall.Getpid(), srv.EndlessListener.Addr(), "Listener closed.")
}
}
|
shutdown Here the connection is shut down first, as the child process is already started, so it is no longer processing requests and needs to shut down the port listening. The srv.hammerTime method is also called asynchronously to wait 60 seconds for the parent process to finish processing the request before shutting it down.
getListener Get Port Listener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {
// 如果是子进程
if srv.isChild {
var ptrOffset uint = 0
runningServerReg.RLock()
defer runningServerReg.RUnlock()
// 这里还是处理多个 server 的情况
if len(socketPtrOffsetMap) > 0 {
// 根据server 的顺序来获取 listen fd 的序号
ptrOffset = socketPtrOffsetMap[laddr]
}
// fd 0,1,2是预留给 标准输入、输出和错误的,所以从3开始
f := os.NewFile(uintptr(3+ptrOffset), "")
l, err = net.FileListener(f)
if err != nil {
err = fmt.Errorf("net.FileListener error: %v", err)
return
}
} else {
// 父进程 直接返回 listener
l, err = net.Listen("tcp", laddr)
if err != nil {
err = fmt.Errorf("net.Listen error: %v", err)
return
}
}
return
}
|
There’s nothing to say here if it’s the parent process, just create a port to listen on and return.
But for the child process there are a few detours, starting with why the os.NewFile
argument starts with 3. Because when the child process inherits the fd from the parent process, 0, 1 and 2 are reserved for standard input, output and errors, so the first fd given by the parent process is ordered from 3 in the child process.
As shown below, the first three fd’s are reserved for standard input, output and errors, and fd 3 is incremented according to the array of ExtraFiles passed in.
In fact, here we can also experiment with the opening example.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# 第一次构建项目
go build main.go
# 运行项目,这时就可以做内容修改了
./endless &
# 这个时候我们看看父进程打开的文件
lsof -P -p 17116
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
...
main 18942 root 0u CHR 136,2 0t0 5 /dev/pts/2
main 18942 root 1u CHR 136,2 0t0 5 /dev/pts/2
main 18942 root 2u CHR 136,2 0t0 5 /dev/pts/2
main 18942 root 3u IPv4 2223979 0t0 TCP localhost:5003 (LISTEN)
# 请求项目,60s后返回
curl "http://127.0.0.1:5003/sleep?duration=60s" &
# 重启,17116为父进程pid
kill -1 17116
# 然后我们看一下 main 程序的进程应该有两个
ps -ef |grep ./main
root 17116 80539 0 04:19 pts/2 00:00:00 ./main
root 18110 17116 0 04:21 pts/2 00:00:00 ./main
# 可以看到子进程pid 为18110,我们看看该进程打开的文件
lsof -P -p 18110
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
...
main 19073 root 0r CHR 1,3 0t0 1028 /dev/null
main 19073 root 1u CHR 136,2 0t0 5 /dev/pts/2
main 19073 root 2u CHR 136,2 0t0 5 /dev/pts/2
main 19073 root 3u IPv4 2223979 0t0 TCP localhost:5003 (LISTEN)
main 19073 root 4u IPv4 2223979 0t0 TCP localhost:5003 (LISTEN)
# 新API请求
curl "http://127.0.0.1:5003/sleep?duration=1s"
|
Summary
We have learned through endless how to restart a service without stopping the service. I believe this feature will be used in many scenarios and students who have not used it can try it on their own systems.
Hot restart in general allows the service to be restarted without interrupting the connections already established, the old service process will not accept new connection requests and new connection requests will be accepted in the new service process. For connections already established in the original service process, it is also possible to set them to read off and wait until the requests on the connection have been smoothly processed and the connection is free before exiting.
In this way, it is ensured that the established connection is not interrupted and that the new service process can accept connection requests normally.