In Web services, in addition to the actual business code, often need to achieve a unified record of request logs, rights management or exception handling and other functions, these in the web framework Gin or Django can be achieved through middleware, while gRPC can use interceptor, rpc request or response to intercept processing.
gRPC server and client can implement their own interceptor, according to the two types of rpc requests can be divided into two kinds.
- Unary Interceptor
- Stream Interceptor
Unary Interceptor
For Unary Server Interceptor, just define the UnaryServerInterceptor
method, where handler(ctx, req)
is to call the rpc method.
1
2
3
4
5
6
7
8
|
type UnaryServerInterceptor func(
ctx context.Context,
req interface{},
info *UnaryServerInfo,
handler UnaryHandler
) (interface{}, error){
return handler(ctx, req)
}
|
For the Unary Client Interceptor, you need to define a method UnaryClientInterceptor
, which executes invoker()
to actually request the rpc.
1
2
3
4
5
6
7
8
9
10
11
|
type UnaryClientInterceptor func(
ctx context.Context,
method string,
req,
reply interface{},
cc *ClientConn,
invoker UnaryInvoker,
opts ...CallOption
) error {
return invoker(ctx, method, req, reply, cc, opts...)
}
|
The implementation of the Unary Interceptor can be divided into three parts depending on before and after the call to the handler or invoker: pre-processing before the call, calling the rpc method, and post-processing after the call.
Streaming Interceptor
The implementation of the stream interceptor is consistent with the Unary Interceptor, just implement the methods provided, and the method parameters mean the following.
1
2
3
4
5
6
7
8
|
type StreamServerInterceptor func(
srv interface{}, // rpc请求参数
ss ServerStream, // 服务端stream对象
info *StreamServerInfo, // rpc方法信息
handler StreamHandler // rpc方法本身
) (err error){
return handler(src, ss)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type StreamClientInterceptor func(
ctx context.Context,
desc *StreamDesc,
cc *ClientConn,
method string,
streamer Streamer,
opts ...CallOption // options
)(ClientStream, error){
// Stream operation pre-processing
clientStream, err := streamer(ctx, desc, cc, method, opts...)
// Intercept stream operations via clientStream based on certain conditions
return clientStream, err
}
|
Unlike other interceptors, the implementation of client-side stream interceptor is divided into two parts, stream operation pre-processing and stream operation interception, which cannot be called and post-processed by rpc methods afterwards, but can only intercept stream operations through ClientStream objects, such as calling ClientStream.CloseSend()
according to a specific metadata to terminate the stream operation.
Example
Here we will write a demo of each of the above four interceptors simply outputting the request log to see the actual effect.
The demo directory structure is as follows.
1
2
3
4
5
6
7
8
9
10
11
|
rpc
├── base.proto
├── base
│ ├── base.pb.go
│ └── base_grpc.pb.go
├── client
│ └── main.go
├── server
│ └── main.go
├── go.mod
├── go.sum
|
The base.proto file is as follows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
syntax = "proto3";
package proto;
option go_package = "base;base";
service BaseService {
rpc GetTime (TimeRequest) returns (TimeResponse){}
rpc Streaming (stream StreamRequest) returns (stream StreamResponse){}
}
message TimeRequest {}
message TimeResponse {
string time = 1;
}
message StreamRequest{
string input = 1;
}
message StreamResponse{
string output = 1;
}
|
Execute the command protoc --go_out=. --go-grpc_out=. base.prto
to generate the corresponding pb file.
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package main
import (
"context"
"log"
"io"
"net"
"strconv"
"time"
pb "rpc/base"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type service struct {
pb.UnimplementedBaseServiceServer
}
func main() {
listen, err := net.Listen("tcp", ":50051")
if err != nil {
fmt.Println(err)
}
s := grpc.NewServer(grpc.UnaryInterceptor(UnaryServerInterceptor), grpc.StreamInterceptor(StreamServerInterceptor))
reflection.Register(s)
pb.RegisterBaseServiceServer(s, &service{})
s.Serve(listen)
}
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println("start unary")
resp, err = handler(ctx, req)
log.Printf("end unary %v\n", resp)
return resp, err
}
func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Println("before stream")
err := handler(srv, ss)
log.Println("after stream")
return err
}
// Specific implementations
func (s *service) GetTime(ctx context.Context, in *pb.TimeRequest) (*pb.TimeResponse, error) {
now := time.Now().Format("2006-01-02 15:04:05")
return &pb.TimeResponse{Time: now}, nil
}
func (s *service) Streaming(stream pb.BaseService_StreamingServer) error {
for n := 0; ; {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
v, _ := strconv.Atoi(res.Input)
log.Println(v)
n += v
stream.Send(&pb.StreamResponse{Output: strconv.Itoa(n)})
}
}
|
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
package main
import (
"context"
"io"
"log"
"strconv"
pb "rpc/base"
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial(":50051", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(UnaryClientInterceptor), grpc.WithStreamInterceptor(StreamClientInterceptor))
if err != nil {
log.Println(err)
}
defer conn.Close()
c := pb.NewBaseServiceClient(conn)
// Execute unary, stream in sequence
_, err = c.GetTime(context.Background(), &pb.TimeRequest{})
if err != nil {
log.Fatal(err)
}
print("===============\n")
streaming(c)
}
func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Println("before unary")
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("end unary %v\n", reply)
return err
}
func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println("before stream")
clientStream, err := streamer(ctx, desc, cc, method, opts...)
log.Println("check metadata")
return clientStream, err
}
func streaming(client pb.BaseServiceClient) error {
stream, _ := client.Streaming(context.Background())
for n := 0; n < 10; n++ {
log.Println("Streaming Send:", n)
err := stream.Send(&pb.StreamRequest{Input: strconv.Itoa(n)})
if err != nil {
return err
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Println("Streaming Recv:", res.Output)
}
stream.CloseSend()
return nil
}
|
Execute go run server/main.go
and go run client/main.go
in order in the rpc directory, and the output will be as follows.
It can be clearly seen that StreamClientInterceptor outputs the log twice at the beginning of stream processing, while the remaining three interceptors output twice before and after the request.
Summary
If multiple interceptors need to be used, the corresponding four linkers are provided in grpc-go to concatenate multiple interceptors.
grpc.ChainUnaryInterceptor(i ...UnaryServerInterceptor)
grpc.ChainStreamInterceptor(i ...StreamServerInterceptor)
grpc.WithChainUnaryInterceptor(i ...UnaryClientInterceptor)
grpc.WithChainStreamInterceptor(i ...StreamClientInterceptor)
If the grpc version is too old and may not yet provide a chain api, you can use the third-party library grpc-ecosystem/go-grpc-middleware. In addition to the linker, the library also provides many common interceptors, such as grpc_zap, grpc_recovery, etc. Of course, special requirements can also be implemented by implementing the corresponding methods to achieve a custom interceptor.