nethttp server source
golang net/http Server主要流程源码分析。
时间:2019年3月14日
处理一个http请求大致思路如下:
创建一个net.Listener对象
net.Listener对象Accept一个tcp连接
从tcp连接读取一个请求并创建一个响应体
调用接口处理这个连接的请求并写入数据udao响应
主要堆栈:
http.ListenAndServe(addr string, handler Handler) error
http.*Server.ListenAndServe() error
net.Listen(network, address string) (net.Listener, error)
http.*Server.Serve(l net.Listener) error
http.*Server.setupHTTP2_Serve()
net.Listener.Accept() (net.Conn, error)
http.*Server.newConn(rwc net.Conn) *http.conn
http.*conn.setState(nc net.Conn, state ConnState)
http.*conn.serve(ctx context.Context)
defer http.*conn.serve.func()
http.*conn.rwc.(*tls.Conn)
tls.*Conn.Handshake()
tls.ConnectionState.NegotiatedProtocol
http.*conn.readRequest(ctx context.Context) (w *http.response, err error)
http.serverHandler{http.*conn.server}.ServeHTTP(w, w.req)
Start
启动一个Server
package main
import "fmt"
import "net/http"
func main() {
http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %q", r.URL.Path)
})
http.ListenAndServe(":8080", nil)
}
http.ListenAndServe
首先http.ListenAndServe会创建一个Server,设置地址和Handler,然后调用Server对象的ListenAndServe方法启动。
ListenAndServe方法先检查srv的状态是否是关闭,服务是关闭的就直接退出
然后设置默认地址,利用地址监听tcp连接,最后调用Server对象的Serve的方法,处理这个监听。
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
Serve
Server对象的Serve方法才是处理个监听的主要过程,启动顺序函数如下:
testHookServerServe执行net/http库默认的测试函数。
if fn := testHookServerServe; fn != nil {
fn(srv, l)
}
onceCloseListener对象封装net.Listener对象的Close方法,使用sync.Once对象确保net.Listener只会关闭一次。
l = &onceCloseListener{Listener: l}
defer l.Close()
setupHTTP2_Serve设置http2,如果启用了https默认就是http2,h2可以使用环境变量设置是否启动,具体不分析。
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
trackListener设置track日志,忽略。
if !srv.trackListener(&l, true) {
return ErrServerClosed
}
defer srv.trackListener(&l, false)
baseCtx是Server一个监听的根Context。
baseCtx := context.Background()
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for循环处理Accept到的连接。
for {
rw, e := l.Accept()
...
}
如果Accept返回err,会srv.getDoneChan()方法检测Server是否结束,后序忽略。
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
Accept获得了一个net.Conn连接对象,使用srv.newConn方法创建一个http.conn连接。
http.conn连接就是http连接,设置连接状态用于连接复用,然后c.serve处理这个http连接。
c := srv.newConn(rw)
c.setState(c.rwc, StateNew)
go c.serve(ctx)
http.Server.Serve完整源码如下:
func (srv *Server) Serve(l net.Listener) error {
if fn := testHookServerServe; fn != nil {
fn(srv, l)
}
l = &onceCloseListener{Listener: l}
defer l.Close()
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
if !srv.trackListener(&l, true) {
return ErrServerClosed
}
defer srv.trackListener(&l, false)
var tempDelay time.Duration
baseCtx := context.Background()
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, e := l.Accept()
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
if ne, ok := e.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
time.Sleep(tempDelay)
continue
}
return e
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew)
go c.serve(ctx)
}
}
net.conn.serve
net.conn.serve处理一个http连接。
WithValue设置Context,忽略。
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
defer部分捕捉panic抛出的错误,然后Server对象输出,如果Server对象设置了log.Logger,就输出到log,否则输出到默认。
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
...
}()
defer后部分,检测http连接的状态,非劫持状态就关闭连接,设置http状态成关闭;劫持状态一般在Websock下,使用Hijack方法获取了tcp连接,然后自定义处理。
if !c.hijacked() {
c.close()
c.setState(c.rwc, StateClosed)
}
c.rwc就是net.Conn的连接,net.Conn实现了Reader、Writer、Closer接口,所以缩写rwc。
rwc连接断言判断是否是tls.Conn连接,判断是否是https连接;如果是就设置rwc的超时,
if tlsConn, ok := c.rwc.(*tls.Conn); ok {
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
...
}
tlsConn.Handshake()是检测tls握手是否正常,不正常就返回http 400的响应并关闭连接;
if err := tlsConn.Handshake(); err != nil {
if re, ok := err.(tls.RecordHeaderError); ok && re.Conn != nil && tlsRecordHeaderLooksLikeHTTP(re.RecordHeader) {
io.WriteString(re.Conn, "HTTP/1.0 400 Bad Request\r\n\r\nClient sent an HTTP request to an HTTPS server.\n")
re.Conn.Close()
return
}
c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
return
}
获取NegotiatedProtocol信息,就是NextProto的值,如果值是h2,就使用h2的连接处理;h2详细见h2握手分析,此tls部分可忽略,是tls的ALPN扩展的支持。
c.tlsState = new(tls.ConnectionState)
*c.tlsState = tlsConn.ConnectionState()
if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
if fn := c.server.TLSNextProto[proto]; fn != nil {
h := initNPNRequest{tlsConn, serverHandler{c.server}}
fn(c.server, tlsConn, h)
}
return
}
注意:tlsConn.Handshake()一定要执行,是验证tls握手,然后才会有NegotiatedProtocol等tls连接信息。
注意:NegotiatedProtocol是tls的ALPN扩展的关键,h2协议握手下的值就是h2
tls部分完整如下:
if tlsConn, ok := c.rwc.(*tls.Conn); ok {
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if err := tlsConn.Handshake(); err != nil {
if re, ok := err.(tls.RecordHeaderError); ok && re.Conn != nil && tlsRecordHeaderLooksLikeHTTP(re.RecordHeader) {
io.WriteString(re.Conn, "HTTP/1.0 400 Bad Request\r\n\r\nClient sent an HTTP request to an HTTPS server.\n")
re.Conn.Close()
return
}
c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
return
}
c.tlsState = new(tls.ConnectionState)
*c.tlsState = tlsConn.ConnectionState()
if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
if fn := c.server.TLSNextProto[proto]; fn != nil {
h := initNPNRequest{tlsConn, serverHandler{c.server}}
fn(c.server, tlsConn, h)
}
return
}
}
tls部分没将请求变成h2就继续按http/1.1处理请求。
newBufioReader部分对rwc,使用bufio变成缓冲读写。
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
然后for循环调用net.conn.readRequest读取一个请求,并创建ResponseWriter对象。
for {
w, err := c.readRequest(ctx)
...
}
如果读取请求错误,就直接返回4xx错误响应并关闭连接。
if err != nil {
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
if err == errTooLarge {
const publicErr = "431 Request Header Fields Too Large"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
c.closeWriteAndWait()
return
}
if isCommonNetReadError(err) {
return
}
publicErr := "400 Bad Request"
if v, ok := err.(badRequestError); ok {
publicErr = publicErr + ": " + string(v)
}
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}
c.curReq.Store(w)
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}
创建一个serverHandler处理当前的请求rw,serverHandler就检测Server是否设置了默认处理者,和响应Option方法,可忽略。
然后判断连接状态是否劫持,劫持直接结束。
...
注意:serverHandler{c.server}.ServeHTTP(w, w.req),就是用连接先创建request和response对象,然使用http.Handler对象来处理这个请求。
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle)
c.curReq.Store((*response)(nil))
http.serverHandler定义:
type serverHandler struct {
srv *Server
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*" && req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
handler.ServeHTTP(rw, req)
}
...
if !w.conn.server.doKeepAlives() {
return
}
if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
return
}
}
c.rwc.SetReadDeadline(time.Time{})
net.conn.serve完整定义如下:
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
if !c.hijacked() {
c.close()
c.setState(c.rwc, StateClosed)
}
}()
if tlsConn, ok := c.rwc.(*tls.Conn); ok {
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if err := tlsConn.Handshake(); err != nil {
if re, ok := err.(tls.RecordHeaderError); ok && re.Conn != nil && tlsRecordHeaderLooksLikeHTTP(re.RecordHeader) {
io.WriteString(re.Conn, "HTTP/1.0 400 Bad Request\r\n\r\nClient sent an HTTP request to an HTTPS server.\n")
re.Conn.Close()
return
}
c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
return
}
c.tlsState = new(tls.ConnectionState)
*c.tlsState = tlsConn.ConnectionState()
if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
if fn := c.server.TLSNextProto[proto]; fn != nil {
h := initNPNRequest{tlsConn, serverHandler{c.server}}
fn(c.server, tlsConn, h)
}
return
}
}
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
c.setState(c.rwc, StateActive)
}
if err != nil {
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
if err == errTooLarge {
const publicErr = "431 Request Header Fields Too Large"
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
c.closeWriteAndWait()
return
}
if isCommonNetReadError(err) {
return
}
publicErr := "400 Bad Request"
if v, ok := err.(badRequestError); ok {
publicErr = publicErr + ": " + string(v)
}
fmt.Fprintf(c.rwc, "HTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}
c.curReq.Store(w)
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest()
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.rwc, StateIdle)
c.curReq.Store((*response)(nil))
if !w.conn.server.doKeepAlives() {
return
}
if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
return
}
}
c.rwc.SetReadDeadline(time.Time{})
}
}
net.conn.readRequest
readRequest方法就是根据连接创建http.Request和http.ResponseWriter两个对象供http.Handler接口使用,处理一个请求。
end
创建一个Server并处理http请求到此就结束。
http.HandleFunc
http.HandleFunc使用http.DefaultServeMux这个默认路由调用HandleFunc方法注册一个路由。
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
DefaultServeMux.HandleFunc(pattern, handler)
}
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request))
http.Handler
http.Handler是net/http库处理请求的接口,http.Server直接调用Handler处理请求。
http.ServeMux是net/http库内置的路由器,执行了基本匹配,但是实现了http.Handler接口,Server就直接使用Mux。
HandlerFunc是处理函数,但是这个类型实现了http.Handler接口,就将一个函数转换成了接口。
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
type HandlerFunc func(ResponseWriter, *Request)
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}
一下两种方法都将func(ResponseWriter, *Request){}
这样一个函数转换成了http.Handler接口
http.HandlerFunc(func(ResponseWriter, *Request){})
func Convert(h http.HandlerFunc) http.Handler {
return h
}
Middleware
基于net/http简单实现中间件,使用http.Handler接口,使用装饰器模式嵌套一层。
Logger对象实现了http.Handler接口,会先输出请求信息,然后调用路由处理这个请求。
http.ServeMux是标准库实现的路由器,会匹配并处理请求。
package main
import "fmt"
import "net/http"
func main() {
mux := &http.ServeMux{}
mux.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %q", r.URL.Path)
})
http.ListenAndServe(":8080", &Logger{mux})
}
type Logger struct {
h http.Handler
}
func (log *Logger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Printf("%s %s\n", r.Method, r.URL.Path)
log.h.ServeHTTP(w, r)
}
Last updated
Was this helpful?