I recently used Gunicorn’s Graceful Shutdown feature in a project, so I read the code to learn about Gunicorn’s signal processing.
Master
Gunicorn starts with.
1
2
3
|
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
BaseApplication().run()
Arbiter(self).run()
|
The main control logic of the Master is implemented in Arbiter
, including the signal handling and main loop logic. A call to Arbiter().run()
will eventually lead to a call to Arbiter.init_signals()
, where the signal functions defined in the Master that need to be handled are registered accordingly.
1
2
3
4
5
6
7
|
Arbiter().run():
Arbiter().start()
Arbiter().init_signals() # 初始化 Master 信号处理
# initialize all signals
for s in self.SIGNALS: # "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH"
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld)
|
All defined signal handling functions are Aribiter().signal()
which stores the received signals in Arbiter.SIG_QUEUE
, keeping up to 5 signals, and then triggers Arbiter().wakeup()
which, in Arbiter().wakeup()
, writes data to the PIPE
pipe, which mainly serves to wake up the master loop.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.')
except IOError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
|
The main difference between TERM and INT signal processing is whether Arbiter().stop(False)
is executed first, and finally raise StopIteration
. For the HUP
signal the Arbiter().reload()
logic is executed itself.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
def handle_hup(self):
"""\
HUP handling.
- Reload configuration
- Start the new worker processes with a new configuration
- Gracefully shutdown the old worker processes
"""
self.log.info("Hang up: %s", self.master_name)
self.reload()
def handle_term(self):
"SIGTERM handling"
raise StopIteration
def handle_int(self):
"SIGINT handling"
self.stop(False)
raise StopIteration
def handle_quit(self):
"SIGQUIT handling"
self.stop(False)
raise StopIteration
def handle_ttin(self):
"""\
SIGTTIN handling.
Increases the number of workers by one.
"""
self.num_workers += 1
self.manage_workers()
|
After the Master performs the signal processing initialization action, it enters the Master loop phase.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
try:
self.manage_workers() # 启动 Worker,如果 Worker 数量不足则启动;如果数量超过预期则 kill。
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep() # 如果 Master 没有接收到信号,则进入到 `Arbiter().sleep()` ,该函数从 `Arbiter.PIPE` 读取数据(阻塞),直到 `Arbiter.PIPE` 有数据后才返回
self.murder_workers() # 如果在当前循环中,接收到信号,则进行 worker 处理,清理无用 Worker
self.manage_workers() # 重新进入创建、清理 Worker 逻辑
continue # 进入下一循环,在下一次循环中,sig 为刚刚唤醒 Master 的信号
if sig not in self.SIG_NAMES: # 如果接收到的信号不需要处理,则忽略
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig) # 获取信号名称
handler = getattr(self, "handle_%s" % signame, None) # 获取 Master 针对该信号的处理函数
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler() # 执行信号处理
self.wakeup()
except (StopIteration, KeyboardInterrupt): # 如果信号为 TERM 或 INT,则会触发 Exception,进入到 `Arbiter().halt()` 逻辑,清理配置并退出进程
self.halt()
|
Worker
When the master creates a worker, it first executes worker.init_process()
to initialize the process
1
2
3
4
5
|
self.init_signals()
...
# Enter main run loop
self.booted = True
self.run()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def init_signals(self):
# reset signaling
for s in self.SIGNALS:
signal.signal(s, signal.SIG_DFL)
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1])
|
In the Worker, all signal handling functions are triggered directly, without the SIG_QUEUE
form of the Master.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
def handle_usr1(self, sig, frame):
self.log.reopen_files()
def handle_exit(self, sig, frame):
self.alive = False
def handle_quit(self, sig, frame):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
def handle_abort(self, sig, frame):
self.alive = False
self.cfg.worker_abort(self)
sys.exit(1)
|
In the worker’s loop logic, the priority is to check if self.alive
is True, and if it is, then the request processing logic goes to
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify()
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
|
Complete process
Taking the HUP signal as an example, in the Gunicorn documentation, HUP can gracefully restart the worker process by.
HUP: Reload the configuration, start the new worker processes with a new configuration and gracefully shutdown older workers. If the application is not preloaded (using the preload_app option), Gunicorn will also load the new version of it.
When the master process receives the HUP signal.
- Master loop wakes up and enters
murder_workers
and manage_workers
processing
- The master loop enters the next loop and gets the HUP signal processing function
- Execute
Arbiter().handle_hup()
- execute
Arbiter().reload()
to reload the configuration, reload the WSGI app, and start the new worker according to the worker configuration, and after the start, execute manage_workers
to process the original worker, because at this time the number of workers is double the expected number, so it will enter the stop the original Worker processing logic, execute Arbiter().kill_worker()
logic, execute os.kill
, pass the signal as TERM
- At this point the master signal processing is complete and
Arbiter().wakeup()
is executed to bring it back into the loop blocking logic.
When the Worker process receives the TERM signal.
- Execute the
Worker().handle_exit()
logic, setting self.alive
to False
- In the worker’s loop logic, the priority is to check if
self.active
is True, and if it is not True, the next loop exits after the current logic is processed, thus achieving graceful shutdown