This article looks at how the Go language HTTP standard library is implemented.
The HTTP-based service standard model consists of two ends, a client (‘Client’) and a server (‘Server’). HTTP requests are sent from the client, and the server receives the request, processes it and returns the response to the client. So the job of the HTTP server is to accept requests from the client and return a response to the client.
A typical HTTP service should look like the following diagram.
HTTP client
A simple example of how you can initiate a request for data in Go is directly through the Get method of the HTTP package.
1
2
3
4
5
6
7
8
9
10
|
func main() {
resp, err := http.Get("http://httpbin.org/get?name=luozhiyun&age=27")
if err != nil {
fmt.Println(err)
return
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println(string(body))
}
|
Let’s go through this example.
The Get method of HTTP will call the Get method of DefaultClient, which is an empty instance of Client, so it will end up calling the Get method of Client.
Client struct
1
2
3
4
5
6
|
type Client struct {
Transport RoundTripper
CheckRedirect func(req *Request, via []*Request) error
Jar CookieJar
Timeout time.Duration
}
|
The Client structure consists of a total of four fields.
Transport : indicates the HTTP transaction, which is used to process the client’s request connection and wait for the server’s response.
CheckRedirect: used to specify the policy for handling redirects.
Jar: used to manage and store cookies in requests.
Timeout : specifies a maximum timeout for client requests that includes the connection, any redirects and the time to read the corresponding time; Timeout : specifies a maximum timeout for client requests that includes the connection, any redirects and the time to read the corresponding time.
Initialising the request
1
2
3
4
5
6
7
8
9
|
func (c *Client) Get(url string) (resp *Response, err error) {
// 根据方法名、URL 和请求体构建请求
req, err := NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
// 执行请求
return c.Do(req)
}
|
To initiate a request we first need to construct a complete request header, request body, and request parameters based on the request type. Only then is the request executed based on the complete structure of the request.
NewRequest initialises the request
NewRequest is called on the NewRequestWithContext function. This function returns a Request structure based on the request, which contains all the information for an HTTP request.
Request
The Request structure has a number of fields, I’ll list a few familiar ones here.
NewRequestWithContext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) {
...
// parse url
u, err := urlpkg.Parse(url)
if err != nil {
return nil, err
}
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = ioutil.NopCloser(body)
}
u.Host = removeEmptyPort(u.Host)
req := &Request{
ctx: ctx,
Method: method,
URL: u,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(Header),
Body: rc,
Host: u.Host,
}
...
return req, nil
}
|
The NewRequestWithContext function wraps the request into a Request structure and returns it.
Preparing an http request
As shown above, the Client calls the Do method to process the send request and finally calls the send function.
1
2
3
4
5
6
7
8
|
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
...
return resp, nil, nil
}
|
Transport
The send method of the Client will call the transport method to obtain the DefaultTransport instance before calling the send function for further processing, as follows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
var DefaultTransport RoundTripper = &Transport{
// 定义 HTTP 代理策略
Proxy: ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
// 最大空闲连接数
MaxIdleConns: 100,
// 空闲连接超时时间
IdleConnTimeout: 90 * time.Second,
// TLS 握手超时时间
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
|
Transport implements the RoundTripper interface, a structure that sends http requests and waits for responses.
1
2
3
|
type RoundTripper interface {
RoundTrip(*Request) (*Response, error)
}
|
We can also see from the RoundTripper interface that the RoundTrip method defined by this interface will specifically process the request and respond with a Response once it has been processed.
Going back to our Client’s send method above, it calls the send function, the main logic of which is left to Transport’s RoundTrip method.
RoundTrip will call into the roundTrip method.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
...
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// 封装请求
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// 获取连接
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}
// 等待响应结果
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
resp.Request = origReq
return resp, nil
}
...
}
}
|
The roundTrip method will do two things.
- call the getConn method of Transport to obtain the connection.
- after the connection is obtained, call the roundTrip method of persistConn to wait for the result of the request response.
Getting a connection getConn
getConn has two phases.
- calling queueForIdleConn to get a free connection.
- calling queueForDial to wait for a new connection to be created.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
// 将请求封装成 wantConn 结构体
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()
// 获取空闲连接
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
...
t.setReqCanceler(treq.cancelKey, func(error) {})
return pc, nil
}
// 创建连接
t.queueForDial(w)
select {
// 获取到连接后进入该分支
case <-w.ready:
...
return w.pc, w.err
...
}
|
Get idle connection queueForIdleConn
Idle connection successfully obtained.
Successfully fetching a connection consists of the following steps.
- go to the free connection dictionary to see if there is a list of free connections based on the address of the current request.
- if the list of free connections can be fetched, fetch the last connection in the list.
- return.
If no free connection is available.
When a free connection is not available: 1.
- check the free connection dictionary for a list of free connections based on the address of the current request.
- if no connection exists for the request, then add the wantConn to the waiting for a free connection dictionary.
The above diagram should give you a good idea of how this step will work, but here is a brief analysis of the code to make the logic clearer.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
if t.DisableKeepAlives {
return false
}
t.idleMu.Lock()
defer t.idleMu.Unlock()
t.closeIdle = false
if w == nil {
return false
}
// 计算空闲连接超时时间
var oldTime time.Time
if t.IdleConnTimeout > 0 {
oldTime = time.Now().Add(-t.IdleConnTimeout)
}
// Look for most recently-used idle connection.
// 找到key相同的 connection 列表
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {
// 找到connection列表最后一个
pconn := list[len(list)-1]
// 检查这个 connection 是不是等待太久了
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
if tooOld {
go pconn.closeConnIfStillIdle()
}
// 该 connection 被标记为 broken 或 闲置太久 continue
if pconn.isBroken() || tooOld {
list = list[:len(list)-1]
continue
}
// 尝试将该 connection 写入到 w 中
delivered = w.tryDeliver(pconn, nil)
if delivered {
// 操作成功,需要将 connection 从空闲列表中移除
if pconn.alt != nil {
} else {
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
stop = true
}
if len(list) > 0 {
t.idleConn[w.key] = list
} else {
// 如果该 key 对应的空闲列表不存在,那么将该key从字典中移除
delete(t.idleConn, w.key)
}
if stop {
return delivered
}
}
// 如果找不到空闲的 connection
if t.idleConnWait == nil {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
// 将该 wantConn 加入到 等待获取空闲 connection 字典中
q := t.idleConnWait[w.key]
q.cleanFront()
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
}
|
The comments above are clear enough, so I won’t explain them here.
Establishing a connection queueForDial
After a free connection is not obtained, an attempt is made to establish a connection, as can be seen from the diagram above, in the following steps.
- the queueForDial method is called with a check to see if MaxConnsPerHost is not set or has reached its limit.
- if the check does not pass then the current request is put into the connsPerHostWait waiting dictionary;
- if the check passes then the dialConnFor method is called asynchronously to create the connection.
- the dialConnFor method first calls the dialConn method to create a TCP connection, then starts two asynchronous threads to process the read and write data, and then calls tryDeliver to bind the connection to wantConn.
The following code analysis is performed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
// 小于零说明无限制,异步建立连接
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}
t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()
// 每个 host 建立的连接数没达到上限,异步建立连接
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}
//每个 host 建立的连接数已达到上限,需要进入等待队列
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}
|
Here the parameters are checked, if the maximum number of connections is limited to zero, or if the maximum number of connections per host is not reached, then the connections are established asynchronously.
dialConnFor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()
// 建立连接
pc, err := t.dialConn(w.ctx, w.cm)
// 连接绑定 wantConn
delivered := w.tryDeliver(pc, err)
// 建立连接成功,但是绑定 wantConn 失败
// 那么将该连接放置到空闲连接字典或调用 等待获取空闲 connection 字典 中的元素执行
if err == nil && (!delivered || pc.alt != nil) {
t.putOrCloseIdleConn(pc)
}
if err != nil {
t.decConnsPerHost(w.key)
}
}
|
dialConnFor will call dialConn to create a TCP connection, and then call the tryDeliver method to bind to wantConn after creation.
dialConn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
// 创建连接结构体
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
...
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
...
} else {
// 建立 tcp 连接
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
}
...
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
if e, ok := alt.(http2erringRoundTripper); ok {
// pconn.conn was closed by next (http2configureTransport.upgradeFn).
return nil, e.err
}
return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
}
}
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
//为每个连接异步处理读写数据
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
|
The connection configuration is set up differently depending on the schema, and I have shown above the process of creating our usual HTTP connections. For HTTP it will create a tcp connection, then asynchronously process the read and write data for the connection, and finally return the created connection.
Waiting for a response
This section is a little more complicated, but really interesting.
When a connection is created two channels are initialised: writech for writing the request data and reqch for reading the response data. When we created the connection above, we also mentioned that two asynchronous loops, readLoop and writeLoop, are created for the connection to handle the read and write data.
After the connection is fetched, the connection’s roundTrip method is called, which first writes the request data to the writech pipe, and the writeLoop receives the data and processes the request.
Then roundTrip writes the requestAndChan structure to the reqch pipe, and then roundTrip loops through. readLoop reads the response data and then encapsulates it in a responseAndError structure via the pipe stored in the requestAndChan structure. This allows the roundTrip to receive the response data, end the loop and return.
roundTrip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
...
writeErrCh := make(chan error, 1)
// 将请求数据写入到 writech 管道中
pc.writech <- writeRequest{req, writeErrCh, continueCh}
// 用于接收响应的管道
resc := make(chan responseAndError)
// 将用于接收响应的管道封装成 requestAndChan 写入到 reqch 管道中
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
...
}
...
for {
testHookWaitResLoop()
select {
// 接收到响应数据
case re := <-resc:
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
// 返回响应数据
return re.res, nil
...
}
}
|
Here the writeRequest is wrapped as the data to send the request, the pipe to receive the response is wrapped as requestAndChan and written to the reqch pipe, and then the loop waits to receive the response.
The writeLoop then does the request data writeRequest.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
// 向 TCP 连接中写入数据,并发送至目标服务器
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
...
case <-pc.closech:
return
}
}
}
|
Here the data fetched from the writech pipeline is written to the TCP connection and sent to the target server.
readLoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
...
alive := true
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
// 获取 roundTrip 发送的结构体
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())
var resp *Response
if err == nil {
// 读取数据
resp, err = pc.readResponse(rc, trace)
} else {
err = transportReadFromServerError{err}
closeErr = err
}
...
// 将响应数据写回到管道中
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
...
}
}
|
Here the corresponding request response data is read from the TCP connection and written back through the pipeline passed in by the roundTrip, which then accepts the data and returns the fetched response data.
http server
I’ll continue with a simple example to start with.
1
2
3
4
5
6
7
8
|
func HelloHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello World")
}
func main () {
http.HandleFunc("/", HelloHandler)
http.ListenAndServe(":8000", nil)
}
|
I’ll start with a diagram to give a brief overview of the implementation.
In fact, we can see some general steps from the method names in the above example.
- registering the processor to a hash table, which can be matched by key-value routing.
- after registration, a loop is opened to listen, and a Goroutine is created every time a connection is listened to.
- inside the created Goroutine, it waits in a loop to receive the request data, then matches the corresponding processor in the processor routing table according to the request address, and hands the request to the processor for processing.
Registering processors
The processor is registered as shown in the example above, by calling the HandleFunc function.
The HandleFunc function is called all the way to the Handle method of ServeMux.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()
...
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
if pattern[len(pattern)-1] == '/' {
mux.es = appendSorted(mux.es, e)
}
if pattern[0] != '/' {
mux.hosts = true
}
}
|
Handle stores muxEntry
objects, which encapsulate the pattern and handler, based on the route as the key of the hash table, and adds the corresponding muxEntry
object to []muxEntry
if the route expression ends in '/'
.
The hash table is used for exact matching of routes, []muxEntry
for partial matching.
Listening
Listening is done by calling the ListenAndServe function, which calls the server’s ListenAndServe method.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
// 监听端口
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
// 循环接收监听到的网络请求
return srv.Serve(ln)
}
|
Serve
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (srv *Server) Serve(l net.Listener) error {
...
baseCtx := context.Background()
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
// 接收 listener 过来的网络连接
rw, err := l.Accept()
...
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew)
// 创建协程处理连接
go c.serve(connCtx)
}
}
|
The Serve method uses a loop to receive network connections and then creates a thread to handle them. Inevitably, there is a problem that if the concurrency is high, too many concurrent threads may be created at once, resulting in an overwhelming situation.
Handling requests
Requests are handled by creating a goroutine for each connection to handle the corresponding request.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
...
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
for {
// 读取请求
w, err := c.readRequest(ctx)
...
// 根据请求路由调用处理器处理请求
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
...
}
}
|
Once a connection is established, all requests within that connection are processed in this concurrent thread until the connection is closed. Inside the for loop readRequest is called recursively to read requests for processing.
Request processing is done by calling ServeHTTP.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
type serverHandler struct {
srv *Server
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
handler.ServeHTTP(rw, req)
}
|
The serverHandler is actually the Server wrapped in a layer. The sh.srv.Handler
parameter here is actually an instance of ServeMux passed in, so the ServeHTTP method of ServeMux will be called at the end.
The match method will eventually be called through the handler to perform the route matching.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (mux *ServeMux) match(path string) (h Handler, pattern string) {
v, ok := mux.m[path]
if ok {
return v.h, v.pattern
}
for _, e := range mux.es {
if strings.HasPrefix(path, e.pattern) {
return e.h, e.pattern
}
}
return nil, ""
}
|
This method first uses an exact match, if the match is successful then it returns directly; if the match is unsuccessful then it matches the closest registered parent route to the current route stored in []muxEntry
, otherwise it continues to match the next parent route until the root route /
. Finally the corresponding processor will be called for processing.