grpc中的拦截器

0.1、索引

https://waterflow.link/articles/1665853719750

当我们编写 HTTP 应用程序时,您可以使用 HTTP 中间件包装特定于路由的应用程序处理程序,可以在执行应用程序处理程序之前和之后执行一些常见的逻辑。 我们通常使用中间件来编写跨领域组件,例如授权、日志记录、缓存等。在 gRPC 中可以使用称为拦截器的概念来实现相同的功能。

通过使用拦截器,我们可以在客户端和服务器上拦截 RPC 方法的执行。 在客户端和服务器上,都有两种类型的拦截器:

  • UnaryInterceptor(一元拦截器)
  • StreamInterceptor(流式拦截器)

UnaryInterceptor 拦截一元 RPC,而 StreamInterceptor 拦截流式 RPC。

在一元 RPC 中,客户端向服务器发送单个请求并返回单个响应。 在流式 RPC 中,客户端或服务器,或双方(双向流式传输),获取一个流读取一系列消息返回,然后客户端或服务器从返回的流中读消息,直到没有更多消息为止。

1、在 gRPC 客户端中编写拦截器

我们可以在 gRPC 客户端应用程序中编写两种类型的拦截器:

  • UnaryClientInterceptor:UnaryClientInterceptor 拦截客户端上一元 RPC 的执行。
  • StreamClientInterceptor:StreamClientInterceptor拦截ClientStream的创建。 它可能会返回一个自定义的 ClientStream 来拦截所有 I/O 操作。

1、UnaryClientInterceptor

为了创建 UnaryClientInterceptor,可以通过提供 UnaryClientInterceptor 函数值调用 WithUnaryInterceptor 函数,该函数返回一个 grpc.DialOption 指定一元 RPC 的拦截器:

func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

然后将返回的 grpc.DialOption 值用作调用 grpc.Dial 函数以将拦截器应用于一元 RPC 的参数。

UnaryClientInterceptor func 类型的定义如下:

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts …CallOption) error

参数调用者是完成 RPC 的处理程序,调用它是拦截器的责任。 UnaryClientInterceptor 函数值提供拦截器逻辑。 这是一个实现 UnaryClientInterceptor 的示例拦截器:

clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        start := time.Now().Unix()
        err := invoker(ctx, method, req, reply, cc, opts...)
        end := time.Now().Unix()
    // 获取调用grpc的请求时长
        fmt.Println("request time duration: ", end - start)
        return err
    }

下面的函数返回一个 grpc.DialOption 值,它通过提供 UnaryClientInterceptor 函数值来调用 WithUnaryInterceptor 函数:

func WithUnaryInterceptorCustom() grpc.DialOption {
  clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        start := time.Now().Unix()
        err := invoker(ctx, method, req, reply, cc, opts...)
        end := time.Now().Unix()
    // 获取调用grpc的请求时长
        fmt.Println("request time duration: ", end - start)
        return err
    }
    return grpc.WithUnaryInterceptor(clientInterceptor)
}

返回的 grpc.DialOption 值用作调用 grpc.Dial 函数以应用拦截器的参数:

conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), WithUnaryInterceptorCustom())

搭建简单grpc服务可以参考这篇文章:https://waterflow.link/articles/1665674508275

2、StreamClientInterceptor

为了创建 StreamClientInterceptor,可以通过提供 StreamClientInterceptor 函数值调用 WithStreamInterceptor 函数,该函数返回一个 grpc.DialOption 指定流 RPC 的拦截器:

func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
    return newFuncDialOption(func(o *dialOptions) {
        o.streamInt = f
    })
}

然后将返回的 grpc.DialOption 值用作调用 grpc.Dial 函数的参数,以将拦截器应用于流式 RPC。

下面是 StreamClientInterceptor func 类型的定义:

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

下面是StreamClientInterceptor的具体实现:

// 包装下stream
// 结构体内嵌接口,初始化的时候需要赋值对象实现了该接口的所有方法
type wrappedStream struct {
    grpc.ClientStream
}

// 实现接收消息方法,并自定义打印
func (w *wrappedStream) RecvMsg(m interface{}) error  {
    log.Printf("====== [Client Stream Interceptor] " +
        "Receive a message (Type: %T) at %v",
        m, time.Now().Format(time.RFC3339))
    return w.ClientStream.RecvMsg(m)
}

// 实现发送消息方法,并自定义打印
func (w *wrappedStream) SendMsg(m interface{}) error {
    log.Printf("====== [Client Stream Interceptor] " +
        "Send a message (Type: %T) at %v",
        m, time.Now().Format(time.RFC3339))
    return w.ClientStream.SendMsg(m)
}

// 初始化包装stream
func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
    return &wrappedStream{s}
}

func WithStreamInterceptorCustom() grpc.DialOption {
    clientInterceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
        clientStream, err := streamer(ctx, desc, cc, method, opts...)
        if err != nil {
            return nil, err
        }
    // 返回包装后的stream
    // 这里clientStream实现了grpc.ClientStream接口
        return newWrappedStream(clientStream), err
    }
    return grpc.WithStreamInterceptor(clientInterceptor)
}

这里需要注意:

  1. 定义一个包装stream结构体wrappedStream,这里用到了结构体内嵌接口的方式,直接实现了接口的所有方法,具体可以看注释
  2. 重写RecvMsg和SendMsg方法
  3. WithStreamInterceptorCustom拦截器中染回包装后的clientStream

为了将 StreamClientInterceptor 应用于流式 RPC,只需将 WithStreamInterceptor 函数返回的 grpc.DialOption 值作为调用 grpc.Dial 函数的参数传递。 您可以将 UnaryClientInterceptor 和 StreamClientInterceptor 值传递给 grpc.Dial 函数。

conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), WithUnaryInterceptorCustom(), WithStreamInterceptorCustom())

完整的客户端代码像,下面这样:

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpcdemo/helloservice"
    "io"
    "log"
    "time"
)

func main() {
    // 连接grpc服务端,加入拦截器
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), WithUnaryInterceptorCustom(), WithStreamInterceptorCustom())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

  // 一元rpc
    unaryRpc(conn)
  // 流式rpc
    streamRpc(conn)

}

// 一元拦截器
func WithUnaryInterceptorCustom() grpc.DialOption {
    clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        start := time.Now().Unix()
        err := invoker(ctx, method, req, reply, cc, opts...)
        end := time.Now().Unix()
        fmt.Println("invoker request time duration: ", end - start)
        return err
    }
    return grpc.WithUnaryInterceptor(clientInterceptor)
}

type wrappedStream struct {
    grpc.ClientStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error  {
    log.Printf("====== [Client Stream Interceptor] " +
        "Receive a message (Type: %T) at %v",
        m, time.Now().Format(time.RFC3339))
    return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    log.Printf("====== [Client Stream Interceptor] " +
        "Send a message (Type: %T) at %v",
        m, time.Now().Format(time.RFC3339))
    return w.ClientStream.SendMsg(m)
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
    return &wrappedStream{s}
}

// 流式拦截器
func WithStreamInterceptorCustom() grpc.DialOption {
    clientInterceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
        clientStream, err := streamer(ctx, desc, cc, method, opts...)
        if err != nil {
            return nil, err
        }
        return newWrappedStream(clientStream), err
    }
    return grpc.WithStreamInterceptor(clientInterceptor)
}

func unaryRpc(conn *grpc.ClientConn) {
    client := helloservice.NewHelloServiceClient(conn)
    reply, err := client.Hello(context.Background(), &helloservice.String{Value: "hello"})
    if err != nil {
        log.Fatal(err)
    }
    log.Println("unaryRpc recv: ", reply.Value)
}

func streamRpc(conn *grpc.ClientConn) {
    client := helloservice.NewHelloServiceClient(conn)
    stream, err := client.Channel(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    go func() {
        for {
            if err := stream.Send(&helloservice.String{Value: "hi"}); err != nil {
                log.Fatal(err)
            }
            time.Sleep(time.Second)
        }
    }()

    for {
        recv, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Fatal(err)
        }

        fmt.Println("streamRpc recv: ", recv.Value)

    }
}

可以结合我之前的文章,把本期的代码加进去运行调试

(搭建简单grpc服务可以参考这篇文章:https://waterflow.link/articles/1665674508275)

运行效果如下:

go run helloclient/main.go
invoker request time duration:  1
2022/10/14 23:17:35 unaryRpc recv:  hello:hello
2022/10/14 23:17:35 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:35+08:00
2022/10/14 23:17:35 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:35+08:00
2022/10/14 23:17:36 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:36+08:00
streamRpc recv:  hello:hi
2022/10/14 23:17:36 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:36+08:00
2022/10/14 23:17:37 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:37+08:00
streamRpc recv:  hello:hi
2022/10/14 23:17:37 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:37+08:00
2022/10/14 23:17:38 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:38+08:00
streamRpc recv:  hello:hi
2022/10/14 23:17:38 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:38+08:00

2、在 gRPC 客户端中编写拦截器

和 gRPC 客户端应用程序一样,gRPC 服务器应用程序提供两种类型的拦截器:

  • UnaryServerInterceptor:提供了一个钩子来拦截服务器上一元 RPC 的执行。
  • StreamServerInterceptor:提供了一个钩子来拦截服务器上流式 RPC 的执行。

1、UnaryServerInterceptor

为了创建 UnaryServerInterceptor,可以通过提供 UnaryServerInterceptor 函数值作为参数调用 UnaryInterceptor 函数,该参数返回为服务器设置 UnaryServerInterceptor 的 grpc.ServerOption 值。

func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        if o.unaryInt != nil {
            panic("The unary server interceptor was already set and may not be reset.")
        }
        o.unaryInt = i
    })
}

然后使用返回的 grpc.ServerOption 值作为参数提供给 grpc.NewServer 函数以注册为 UnaryServerInterceptor。

UnaryServerInterceptor 函数的定义如下:

func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)

参数 info 包含了这个 RPC 的所有信息,拦截器可以对其进行操作。 而handler是服务方法实现的包装器。 拦截器负责调用处理程序来完成 RPC。

1、定义一个服务端的鉴权一元拦截器
func ServerUnaryInterceptorCustom() grpc.ServerOption {
    serverInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        start := time.Now()

    // 如果是非登录请求,需要验证token
        if info.FullMethod != "/helloservice.HelloService/Login" {
            if err := authorize(ctx); err != nil {
                return nil, err
            }
        }

        h, err := handler(ctx, req)

        log.Printf("Request - Method:%s\tDuration:%s\tError:%v\n",
            info.FullMethod,
            time.Since(start),
            err)
        return h, err
    }
    return grpc.UnaryInterceptor(serverInterceptor)
}

// authorize 从Metadata中获取token并校验是否合法
func authorize(ctx context.Context) error {
  // 从context中获取metadata
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return status.Errorf(codes.InvalidArgument, "Retrieving metadata is failed")
    }

    authHeader, ok := md["authorization"]
    if !ok {
        return status.Errorf(codes.Unauthenticated, "Authorization token is not supplied")
    }

    token := authHeader[0]
    // validateToken function validates the token
    err := validateToken(token)

    if err != nil {
        return status.Errorf(codes.Unauthenticated, err.Error())
    }
    return nil
}

func validateToken(token string) error {
    // 校验token
    return nil
}

我们可以看下我们定义的一元拦截器的执行流程:

  1. 首先进来之后我们判断如果是登录请求,直接转发请求,并打印日志
  2. 如果是非登录请求,需要验证token,调用authorize方法
  3. 在authorize方法中,会从context中获取metadata元数据,然后解析获取token并验证

请注意,前面代码块中的拦截器逻辑使用包 google.golang.org/grpc/codes 和 google.golang.org/grpc/status。

2、grpc客户端传入token

gRPC 支持在客户端和服务器之间使用 Context 值发送元数据。 google.golang.org/grpc/metadata 包提供了元数据的功能。

其中MD类型是一个k-v的map,想下面这样

type MD map[string][]string

下面我们在客户端编写向服务端发送token的代码,我们修改下客户端的unaryRpc,构造包含authorization的metadata:

func unaryRpc(conn *grpc.ClientConn) {
    client := helloservice.NewHelloServiceClient(conn)
    ctx := context.Background()
  // 构造元数据,并返回MD类型的结构
    md := metadata.Pairs("authorization", "mytoken")
  // 元数据塞入context并返回新的context
    ctx = metadata.NewOutgoingContext(ctx, md)
    reply, err := client.Hello(ctx, &helloservice.String{Value: "hello"})
    if err != nil {
        log.Fatal(err)
    }
    log.Println("unaryRpc recv: ", reply.Value)
}

这样元数据的信息就会跟着context发送到grpc服务端。

接着我们在服务端grpc中修改如下代码,加入一行日志:

func validateToken(token string) error {
    log.Printf("get the token: %s \n", token)
    // 校验token
    return nil
}
3、运行服务

我们重新执行下grpc服务端程序,然后运行下客户端代码,可以看到token传过来了:

go run helloservice/main/main.go
2022/10/15 20:36:04 server started...
2022/10/15 20:36:08 get the token: mytoken 
2022/10/15 20:36:09 Request - Method:/helloservice.HelloService/Hello   Duration:1.001216763s   Error:<nil>

2、StreamServerInterceptor

为了创建 StreamServerInterceptor,通过提供 StreamServerInterceptor func 值作为参数调用 StreamInterceptor 函数,该参数返回为服务器设置 StreamServerInterceptor 的 grpc.ServerOption 值。

func StreamInterceptor(i StreamServerInterceptor) ServerOption {
    return newFuncServerOption(func(o *serverOptions) {
        if o.streamInt != nil {
            panic("The stream server interceptor was already set and may not be reset.")
        }
        o.streamInt = i
    })
}

然后使用返回的 grpc.ServerOption 值作为参数提供给 grpc.NewServer 函数以注册为 UnaryServerInterceptor。

下面是 StreamServerInterceptor func 类型的定义:

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

我们看下服务端流式拦截器的具体例子:

type wrappedStream struct {
    grpc.ServerStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error  {
    log.Printf("====== [Server Stream Interceptor] " +
        "Receive a message (Type: %T) at %v",
        m, time.Now().Format(time.RFC3339))
    return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    log.Printf("====== [Server Stream Interceptor] " +
        "Send a message (Type: %T) at %v",
        m, time.Now().Format(time.RFC3339))
    return w.ServerStream.SendMsg(m)
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
    return &wrappedStream{s}
}

func ServerStreamInterceptorCustom() grpc.ServerOption {
    serverInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        return handler(srv, newWrappedStream(ss))
    }
    return grpc.StreamInterceptor(serverInterceptor)
}

上面服务端流式拦截器代码可参考客户端流式拦截器的代码,基本差不多。

3、多拦截器

go-grpc在v1.28.0之前是不支持多个拦截器。但是可以使用一些第三方的包,拦截器链接允应用多个拦截器。

v1.28.0之后已经可以支持多个拦截器,我们修改下服务端代码如下:

...

unaryInterceptors := []grpc.ServerOption {
        ServerUnaryInterceptorCustom(),
        ServerStreamInterceptorCustom(),
    }
grpcServer := grpc.NewServer(unaryInterceptors...)

...
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容