HTTP/2 has two concepts, stream and frame, where frame is the smallest transmission unit of communication in HTTP/2, usually a request or response is divided into one or more frames for transmission, and stream represents a virtual channel with established connection, which can transmit multiple requests or responses. Each frame contains a Stream Identifier that identifies the stream to which it belongs. HTTP/2 achieves multiplexing through streams and frames, and requests for the same domain name can be identified by a Stream Identifier in the same stream, thus reducing connection overhead. And gRPC is based on HTTP/2 protocol transmission, which naturally also implements streaming, of which there are three types of streams in gRPC as follows
- server streaming
- client streaming
- bidirectional streaming
This article focuses on how to implement the three types of gRPC streaming.
Proto
By adding the keyword stream
before the request or response body, the message body can be defined as a stream transport, base.proto
as shown below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
syntax = "proto3";
package proto;
option go_package = "base;base";
service BaseService {
// ServerStream
rpc ServerStream (StreamRequest) returns (stream StreamResponse){}
// ClientStream
rpc ClientStream (stream StreamRequest) returns (StreamResponse){}
// bidirectional streaming
rpc Streaming (stream StreamRequest) returns (stream StreamResponse){}
}
message StreamRequest{
string input = 1;
}
message StreamResponse{
string output = 1;
}
|
Execute protoc --go_out=. --go-grpc_out=. stream.prto
to generate the corresponding Go Stub.
Basic Code
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package main
import (
"context"
"fmt"
"net"
"time"
pb "rpc/base"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type service struct {
pb.UnimplementedBaseServiceServer
}
func main() {
listen, err := net.Listen("tcp", ":50051")
if err != nil {
fmt.Println(err)
}
s := grpc.NewServer()
reflection.Register(s)
pb.RegisterBaseServiceServer(s, &service{})
s.Serve(listen)
}
func (s *service) ClientStream(stream pb.BaseService_ClientStreamServer) error {
return nil
}
func (s *service) ServerStream(in *pb.StreamRequest, stream pb.BaseService_ServerStreamServer) error {
return nil
}
func (s *service) Streaming(stream pb.BaseService_StreamingServer) error {
return nil
}
|
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package main
import (
"context"
"fmt"
"io"
"strconv"
pb "rpc/base"
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial(":50051", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
fmt.Println(err)
}
defer conn.Close()
c := pb.NewBaseServiceClient(conn)
}
func clientStream(client pb.BaseServiceClient, input string) error {
return nil
}
func serverStream(client pb.BaseServiceClient, r *pb.StreamRequest) error {
return nil
}
func stream(client pb.BaseServiceClient) error {
return nil
}
|
Server Stream
Normal gRPC will only return one response per request. Server Stream can send multiple StreamResponse
by calling stream.Send()
multiple times.
1
2
3
4
5
6
7
8
9
10
|
// Server
func (s *service) ServerStream(in *pb.StreamRequest, stream pb.BaseService_ServerStreamServer) error {
input := in.Input
var output string
for i := 0; i < len(input); i++ {
output = fmt.Sprintf("index: %d, result: %s", i, string(input[i]))
stream.Send(&pb.StreamResponse{Output: output})
}
return nil
}
|
The client code is as follows, calling stream.Recv()
through a for loop, waiting to receive a response from the server and blocking until an error or the end of the stream.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// 客户端
func serverStream(client pb.BaseServiceClient, r *pb.StreamRequest) error {
fmt.Println("Server Stream Send:", r.Input)
stream, _ := client.ServerStream(context.Background(), r)
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Println("Server Stream Recv:", res.Output)
}
return nil
}
|
Start the server, the client executes serverStream(c, &pb.StreamRequest{Input: "something"})
, and the output is as follows.
1
2
3
4
5
6
7
8
9
10
|
Server Stream Send: something
Server Stream Recv: index: 0, result: s
Server Stream Recv: index: 1, result: o
Server Stream Recv: index: 2, result: m
Server Stream Recv: index: 3, result: e
Server Stream Recv: index: 4, result: t
Server Stream Recv: index: 5, result: h
Server Stream Recv: index: 6, result: i
Server Stream Recv: index: 7, result: n
Server Stream Recv: index: 8, result: g
|
It is easy to see that the client requests once and the server returns the data multiple times, thus implementing Server Stream.
Client Stream
Similar to Server Stream, except it is the server’s turn to call stream.Rece()
in a for loop, receive the client message and block until the client calls stream.CloseAndRecv()
to close the stream and then enter blocking listening. The server calls stream.SendAndClose()
, returns the response body and closes the stream. This way the client is only responsible for sending the end of the stream, the server can end the whole stream processing in the middle.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
//Server
func (s *service) ClientStream(stream pb.BaseService_ClientStreamServer) error {
output := ""
for {
r, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.StreamResponse{Output: output})
}
if err != nil {
fmt.Println(err)
}
output += r.Input
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// Client
func clientStream(client pb.BaseServiceClient, input string) error {
stream, _ := client.ClientStream(context.Background())
for _, s := range input {
fmt.Println("Client Stream Send:", string(s))
err := stream.Send(&pb.StreamRequest{Input: string(s)})
if err != nil {
return err
}
}
res, err := stream.CloseAndRecv()
if err != nil {
fmt.Println(err)
}
fmt.Println("Client Stream Recv:", res.Output)
return nil
}
|
The client executes clientStream(c, "something")
and the output is as follows.
1
2
3
4
5
6
7
8
9
10
|
Client Stream Send: s
Client Stream Send: o
Client Stream Send: m
Client Stream Send: e
Client Stream Send: t
Client Stream Send: h
Client Stream Send: i
Client Stream Send: n
Client Stream Send: g
Client Stream Recv: something
|
Bidirectional Streaming
The client sends a streaming request and the server responds by streaming, but the specific interaction varies depending on the logic written, similar to a chat room. After opening the chat room, how to reply, how much to reply, when to close and who to close, depending on the usage scenario. In the following example code, the client sends 0-10 to the server, and the server does a cumulative return.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// client
func streaming(client pb.BaseServiceClient) error {
stream, _ := client.Streaming(context.Background())
for n := 0; n < 10; n++ {
fmt.Println("Streaming Send:", n)
err := stream.Send(&pb.StreamRequest{Input: strconv.Itoa(n)})
if err != nil {
return err
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Println("Streaming Recv:", res.Output)
}
stream.CloseSend()
return nil
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// Server
func (s *service) Streaming(stream pb.BaseService_StreamingServer) error {
for n := 0; ; {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
v, _ := strconv.Atoi(res.Input)
n += v
stream.Send(&pb.StreamResponse{Output: strconv.Itoa(n)})
}
}
|
The client executes streaming(c)
and the output is as follows.
1
2
3
4
5
6
7
8
9
10
11
|
Streaming Send: 0
Streaming Recv: 0
Streaming Send: 1
Streaming Recv: 1
Streaming Send: 2
Streaming Recv: 3
Streaming Send: 3
Streaming Recv: 6
Streaming Send: 4
Streaming Recv: 10
.....
|
Summary
This article just wrote a simple demo of the three types of stream processing for gRPC. In practice, the choice of which stream to use is based on the business context. For example, a bidirectional stream is similar to a chat room, or keeping a long connection type, while a unidirectional stream can be chosen when the number of transfers is large enough for the receiver to process in bulk.