我们经常看到下面的日记:

rpc error: code = DeadlineExceeded desc = context deadline exceeded

我们需要思虑两个问题:1,那个错误码来源是哪里?2,超时是若何设置和生效的?

起首我们看下第一个问题:我们能够发现那段错误案牍是golang源码里的错误案牍:src/context/context.go

var DeadlineExceeded error = deadlineExceededError{}func (deadlineExceededError) Error() string { return "context deadline exceeded" }

什么时候会返回那个错误呢?同样是golang源码的context包里

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) { if cur, ok := parent.Deadline(); ok && cur.Before(d) { // The current deadline is already sooner than the new one. return WithCancel(parent) } dur := time.Until(d) if dur <= 0 { c.cancel(true, DeadlineExceeded) // deadline has already passed return c, func() { c.cancel(false, Canceled) } } if c.err == nil { c.timer = time.AfterFunc(dur, func() { c.cancel(true, DeadlineExceeded) }) } return c, func() { c.cancel(true, Canceled) }}func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout))}

领会了上面的布景后,我们就能够排查grpc-go的client在何时利用了WithTimeout

google.golang.org/grpc@v1.50.1/clientconn.go

tyPE ClientConn struct {dopts dialOptions}func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { if cc.dopts.timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) defer cancel() }}

能够看到,在倡议毗连的时候会有,当server超越超不时间没有响应的时候就会报上面的错误。

第二个处所就是我们发送恳求的时候,我先会获取一个毗连

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...) } rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethoDConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) { if mc.Timeout != nil && *mc.Timeout >= 0 { ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) } else { ctx, cancel = context.WithCancel(ctx) }

能够看到,若是办法设置装备摆设了超时,在超不时间完成之前,没有响应,也会报错。

还有没有其它处所能够设置装备摆设超时呢?谜底是必定的,Interceptor里我们也能够定义超时。下面就是我们常用的两种设置的超时的办法,别离是毗连维度和恳求办法维度。

clientConn, err := grpc.Dial(serverAddress, grpc.WithTimeout(5 * time.Second), grpc.WithInsecure())if err != nil { log.PRintln("Dial failed!") return err}c := pb.NewGreeterClient(conn)c.SayHello(context.Background(), &pb.HelloRequest{Name: "world"}, WithForcedTimeout(time.Duration(10)*time.Second))

那么上述设置是若何生效的?若何传递到办事端的呢?先看下

grpc.WithTimeout 源码位于google.golang.org/grpc@v1.50.1/dialoptions.go

func WithTimeout(d time.Duration) DialOption { return newFuncDialOption(func(o *dialOptions) { o.timeout = d })}

它修改了dialOptions的timeout

type dialOptions struct { timeout time.Duration}type DialOption interface { apply(*dialOptions)}

而dialOptions是ClientConn的一个属性

type ClientConn struct { dopts dialOptions }

我们倡议毗连的时候用的就是那个属性上的timeout

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { if cc.dopts.timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) defer cancel() }}

Interceptor是若何让超时生效的呢,逻辑更简单,我们看下它的定义,在倡议实正挪用之前先挪用Interceptor,那个时候设置超不时间:

func TimeoutInterceptor(t time.Duration) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { timeout := t if v, ok := getForcedTimeout(opts); ok { timeout = v } if timeout <= 0 { return invoker(ctx, method, req, reply, cc, opts...) } ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return invoker(ctx, method, req, reply, cc, opts...) }}func getForcedTimeout(callOptions []grpc.CallOption) (time.Duration, bool) { for _, opt := range callOptions { if co, ok := opt.(TimeoutCallOption); ok { return co.forcedTimeout, true } } return 0, false}

而超不时间是我们倡议挪用的时候通过option传递下来的

type TimeoutCallOption struct { grpc.EmptyCallOption forcedTimeout time.Duration}func WithForcedTimeout(forceTimeout time.Duration) TimeoutCallOption { return TimeoutCallOption{forcedTimeout: forceTimeout}}

弄清晰客户端的超不时间是若何设置和生效的以后,办事端怎么包管,客户端超时以后,马上末行当前使命呢?答复那个问题之前,我们看下超时是若何传递的。起首,给出谜底:grpc协议将超不时间放置在HTTP Header 恳求头里面。客户端设置的超不时间为5秒,http2的header如下

grpc-timeout: 4995884u

此中u暗示时间单元为纳秒,4995884u 约等于 5秒。然后办事端领受到该恳求后,就能够按照那个时间计算出能否超时,由header 超时设置。

那么header是何时由client设置的,以及何时由办事端解析的呢?

google.golang.org/grpc@v1.50.1/internal/transport/http2_client.go

倡议客户端恳求的时候会挪用

func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) { headerFields, err := t.createHeaderFields(ctx, callHdr)

内部我们能够看到,它从context里面取出超时截行时间,然后写入header "grpc-timeout"里面

func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { if dl, ok := ctx.Deadline(); ok { // Send out timeout regardless its value. The server can detect timeout context by itself. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire. timeout := time.Until(dl) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)}) }

解析的过程:

google.golang.org/grpc@v1.50.1/internal/transport/handler_server.go

func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) { if v := r.Header.Get("grpc-timeout"); v != "" { to, err := decodeTimeout(v) if err != nil { return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err) } st.timeoutSet = true st.timeout = to } if timeoutSet { s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout) } else { s.ctx, s.cancel = context.WithCancel(t.ctx) }

能够看到,起首从header里面取出超不时间,然后设置context.WithTimeout

func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {if ht.timeoutSet {ctx, cancel = context.WithTimeout(ctx, ht.timeout)} else {ctx, cancel = context.WithCancel(ctx)}

google.golang.org/grpc@v1.50.1/internal/transport/http2_server.go

func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {case "grpc-timeout":timeoutSet = truevar err errorif timeout, err = decodeTimeout(hf.Value); err != nil {headerError = true}