1. Unary RPC
proto文件如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
syntax = "proto3";
option go_package=".;service";
message HelloRequest {
// Name of the person to greet
string name = 1;
}
message HelloResponse {
// Greeting message
string greeting = 1;
}
service HelloService {
// RPC method to say hello
rpc SayHello (HelloRequest) returns (HelloResponse){}
}
|
使用命令(注意命令路径和自己的对应):
1
|
protoc -I . --go-grpc_out=require_unimplemented_servers=false:. --go_out=. *.proto
|
对应目录上有xx.pb.go和xx_grpc.pb.go然后对应目录实现服务端接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"net"
"test_grpc/service"
)
type HelloService struct {
}
func (hs *HelloService) SayHello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) {
resp := &service.HelloResponse{
Greeting: fmt.Sprintf("hello %s --from Golang Server", req.Name),
}
return resp, nil
}
func main() {
// listen on 127.0.0.1:50051
listen, err := net.Listen("tcp", "127.0.0.1:50051")
if err != nil {
fmt.Println("Error happened when listen on 127.0.0.1:50051:", err)
return
}
// grpc server
s := grpc.NewServer()
// register HelloService in grpc server
service.RegisterHelloServiceServer(s, &HelloService{})
// start rpc server
fmt.Println("Golang rpc server is waiting messages......")
if err = s.Serve(listen); err != nil {
fmt.Println("Error happened when start rpc server:", err)
return
}
}
|
客户端接口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"test_grpc/service"
"time"
)
func main() {
// connect to server
conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println("Connect to rpc server err:", err)
return
}
defer conn.Close()
// init service client
c := service.NewHelloServiceClient(conn)
// init context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// send message
req := &service.HelloRequest{Name: "Golang"}
r, err := c.SayHello(ctx, req)
if err != nil {
fmt.Println("Send message err:", err)
return
}
fmt.Println("Client:", r.Greeting)
}
|
实际上为了更好得感受gRPC这种跨语言调用的感觉,可以尝试使用python编写client端代码,直接复制proto文件,在python中使用以下命令生成对应的proto文件(注意命令和自己的对应):
1
|
python -m grpc_tools.protoc -I . --python_out=. --pyi_out=. --grpc_python_out=. *.proto
|
使用python实现的客户端代码如下:
1
2
3
4
5
6
7
8
9
10
|
# client template
# grpc server address
channel = grpc.insecure_channel("127.0.0.1:50051")
stub = hello_pb2_grpc.HelloServiceStub(channel)
# send request
response = stub.SayHello(hello_pb2.HelloRequest(name="Python"))
print(response.greeting)
# hello Python --from Golang Server
|
2. Server-side streaming RPC
重新给出这个的proto文件,服务端将以流式数据的形式发送给客户端数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
syntax = "proto3";
option go_package=".;service";
message HelloRequest {
// Name of the person to greet
string name = 1;
}
message HelloResponse {
// Greeting message
string greeting = 1;
}
service HelloService {
// RPC method to say hello
rpc SayHello (HelloRequest) returns (stream HelloResponse){}
}
|
同理,生成对应的proto文件后,在对应的文件先生成server端的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package main
import (
"fmt"
"google.golang.org/grpc"
"net"
"test_grpc/service"
)
type HelloService struct {
}
func (hs *HelloService) SayHello(req *service.HelloRequest, stream service.HelloService_SayHelloServer) error {
resp := &service.HelloResponse{
Greeting: fmt.Sprintf("hello %s --from Golang Server", req.Name),
}
// 连续发送5次
for i := 0; i < 5; i++ {
if err := stream.Send(resp); err != nil {
return err
}
}
return nil
}
func main() {
// listen on 127.0.0.1:50051
listen, err := net.Listen("tcp", "127.0.0.1:50051")
if err != nil {
fmt.Println("Error happened when listen on 127.0.0.1:50051:", err)
return
}
// grpc server
s := grpc.NewServer()
// register HelloService in grpc server
service.RegisterHelloServiceServer(s, &HelloService{})
// start rpc server
fmt.Println("Golang rpc server is waiting messages......")
if err = s.Serve(listen); err != nil {
fmt.Println("Error happened when start rpc server:", err)
return
}
}
|
同理给出客户端的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log"
"test_grpc/service"
"time"
)
func main() {
// connect to server
conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println("Connect to rpc server err:", err)
return
}
defer conn.Close()
// init service client
c := service.NewHelloServiceClient(conn)
// init context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// send message
req := &service.HelloRequest{Name: "Golang"}
stream, err := c.SayHello(ctx, req)
if err != nil {
fmt.Println("Send message err:", err)
return
}
// 加载消息
for {
resp, err := stream.Recv()
// 读到结束标志
if err == io.EOF {
log.Fatalf("end.....")
break
}
if err != nil {
log.Fatalf("failed to receive response: %v", err)
}
log.Printf("Greeting: %s", resp.Greeting)
}
}
|
3. Client-side streaming RPC
对应的proto文件如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
syntax = "proto3";
option go_package=".;service";
message HelloRequest {
// Name of the person to greet
string name = 1;
}
message HelloResponse {
// Greeting message
string greeting = 1;
}
service HelloService {
// RPC method to say hello
rpc SayHello (stream HelloRequest) returns (HelloResponse){}
}
|
同理使用protoc命令生成对应的proto文件,后先编写client端的代码,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"test_grpc/service"
"time"
)
func main() {
// connect to server
conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println("Connect to rpc server err:", err)
return
}
defer conn.Close()
// init service client
c := service.NewHelloServiceClient(conn)
// init context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create stream
stream, err := c.SayHello(ctx)
if err != nil {
log.Fatalf("could not greet: %v", err)
}
names := []string{"World", "Gophers", "Anthropic"}
for _, name := range names {
// request body
req := &service.HelloRequest{Name: name}
if err := stream.Send(req); err != nil {
log.Fatalf("faild to send request: %v", err)
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Greeting: %s", resp.Greeting)
}
|
对应得完成服务端的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package main
import (
"fmt"
"google.golang.org/grpc"
"io"
"net"
"strings"
"test_grpc/service"
)
type HelloService struct {
}
func (hs *HelloService) SayHello(stream service.HelloService_SayHelloServer) error {
var strs []string
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
strs = append(strs, msg.Name)
}
resp := &service.HelloResponse{Greeting: strings.Join(strs, " ")}
err := stream.SendAndClose(resp)
if err != nil {
return err
}
return nil
}
func main() {
// listen on 127.0.0.1:50051
listen, err := net.Listen("tcp", "127.0.0.1:50051")
if err != nil {
fmt.Println("Error happened when listen on 127.0.0.1:50051:", err)
return
}
// grpc server
s := grpc.NewServer()
// register HelloService in grpc server
service.RegisterHelloServiceServer(s, &HelloService{})
// start rpc server
fmt.Println("Golang rpc server is waiting messages......")
if err = s.Serve(listen); err != nil {
fmt.Println("Error happened when start rpc server:", err)
return
}
}
|
4. Bidirectional streaming RPC
新的proto文件被如下给出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
syntax = "proto3";
option go_package=".;service";
message HelloRequest {
// Name of the person to greet
string name = 1;
}
message HelloResponse {
// Greeting message
string greeting = 1;
}
service HelloService {
// RPC method to say hello
rpc SayHello (stream HelloRequest) returns (stream HelloResponse){}
}
|
和上文中的操作一致,同时给出server端的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package main
import (
"fmt"
"google.golang.org/grpc"
"io"
"log"
"net"
"test_grpc/service"
)
type HelloService struct {
}
func (hs *HelloService) SayHello(stream service.HelloService_SayHelloServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
name := msg.Name
resp := &service.HelloResponse{Greeting: name}
if err = stream.Send(resp); err != nil {
log.Fatalf("Failed to send a resp:%s", err)
}
}
return nil
}
func main() {
// listen on 127.0.0.1:50051
listen, err := net.Listen("tcp", "127.0.0.1:50051")
if err != nil {
fmt.Println("Error happened when listen on 127.0.0.1:50051:", err)
return
}
// grpc server
s := grpc.NewServer()
// register HelloService in grpc server
service.RegisterHelloServiceServer(s, &HelloService{})
// start rpc server
fmt.Println("Golang rpc server is waiting messages......")
if err = s.Serve(listen); err != nil {
fmt.Println("Error happened when start rpc server:", err)
return
}
}
|
同时给出下面的client端的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log"
"test_grpc/service"
"time"
)
func main() {
// connect to server
conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println("Connect to rpc server err:", err)
return
}
defer conn.Close()
// init service client
c := service.NewHelloServiceClient(conn)
// init context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create stream
stream, err := c.SayHello(ctx)
if err != nil {
log.Fatalf("could not greet: %v", err)
}
names := []string{"World", "Gophers", "Anthropic"}
waitc := make(chan struct{})
go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Greeting: %s", resp.Greeting)
}
}()
go func() {
for _, name := range names {
// request body
req := &service.HelloRequest{Name: name}
if err := stream.Send(req); err != nil {
log.Fatalf("faild to send request: %v", err)
}
// send delay
time.Sleep(1)
}
// 发送结束的消息
if err := stream.CloseSend(); err != nil {
log.Fatalf("failed to close stream: %v", err)
}
}()
<-waitc
}
|
一定要注意关闭发送或者避免针对一个已经关闭stream进行发送消息,读取消息是被允许的,这里有一点类似chan
4. ALTS
4.1 ALTS的介绍
应用层传输安全(ALTS)是谷歌开发的一种相互验证和传输加密系统。它用于确保谷歌基础设施内 RPC 通信的安全。ALTS 类似于相互 TLS,但经过设计和优化,可满足 Google 生产环境的需要。ALTS在gRPC中有以下的特征:
- 使用ALTS作为传输协议创建gRPC的服务端和客户端;
- ALSTS是一个端到端的保护,具有隐私性和完成性;
- 应用可以访问对等信息比如对等服务账户;
- 支持客户端和服务端的认知;
- 最小的代码更改就能使用ALTS;
值得注意的是ALTS被全部发挥作用如果应用程序运行在CE或者GKE中
4.2 gRPC客户端使用ALTS传输安全协议
gRPC客户端使用ALTS认证去连接服务端,正如下面代码中所描述的:
1
2
3
4
5
6
7
8
|
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/alts"
)
altsTC := alts.NewClientCreds(alts.DefaultClientOptions())
// connect to server
conn, err := grpc.Dial("127.0.0.1:50051", grpc.WithTransportCredentials(altsTC))
|
gRPC服务端能够使用ALTS认证来运行客户端连接到它,正如下面的描述:
1
2
3
4
5
6
7
|
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/alts"
)
altsTC := alts.NewServerCreds(alts.DefaultServerOptions())
server := grpc.NewServer(grpc.Creds(altsTC))
|
4.3 Server Authorization
gRPC 内置了使用 ALTS 的服务器授权支持。使用 ALTS 的 gRPC 客户端可以在建立连接前设置预期的服务器服务账户。然后,在握手结束时,服务器授权会保证服务器身份与客户端指定的服务账户之一相匹配。否则,连接将失败。
1
2
3
4
5
6
7
8
9
|
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/alts"
)
clientOpts := alts.DefaultClientOptions()
clientOpts.TargetServiceAccounts = []string{expectedServerSA}
altsTC := alts.NewClientCreds(clientOpts)
conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(altsTC))
|
|