当前位置:网站首页>【Golang | gRPC】使用gRPC实现Watch功能
【Golang | gRPC】使用gRPC实现Watch功能
2022-07-21 21:52:00 【田土豆】
环境:
Golang: go1.18.2 windows/amd64
gRPC: v1.47.0
Protobuf: v1.28.0
完整代码:
https://github.com/WanshanTian/GolangLearning
cd GolangLearning/RPC/gRPC-Watch
1. 简介
客户端可以通过 Watch
机制来订阅服务器上某一节点的数据或状态,当其发生变化时可以收到相应的通知。
前阵子学习了gRPC
的服务端流模式,今天我们就用这个流模式来具体实现Watch功能
2. 实践
现有下面一种场景:服务端保存着用户的年龄信息,客户端发送含用户姓名的message可以获取对应用户的年龄或者更新对应用户的年龄(年龄+1);通过Watch功能可以实时监听用户年龄的状态,当有用户的年龄发生变化时,客户端收到通知
2.1 proto文件
2.1.1 新建gRPC-Watch文件夹,使用go mod init初始化,创建pb文件夹,新建query.proto文件
syntax = "proto3";
package pb;
option go_package= ".;pb";
import "google/protobuf/empty.proto";
// 定义查询服务包含的方法
service Query {
rpc GetAge (userInfo) returns (ageInfo) {};
rpc Update (userInfo) returns (google.protobuf.Empty) {};
rpc Watch (watchTime) returns (stream userInfo){}
}
// 请求用的结构体,包含一个name字段
message userInfo {
string name = 1;
}
// 响应用的结构体,包含一个age字段
message ageInfo {
int32 age = 1;
}
// watch的时间
message watchTime{
int32 time = 1;
}
GetAge
和Update
方法分别用于获取年龄和更新年龄,均采用简单RPC方式Watch
方法用于监听年龄状态的变化,采用服务端流方式- 当gRPC的方法不需要请求message或者不需要响应message时,可以先
import "google/protobuf/empty.proto"
,然后直接使用google.protobuf.Empty
2.1.2 在.\gRPC-Watch\pb目录下使用protoc工具进行编译,在pb文件夹下直接生成.pb.go和_grpc.pb.go文件。关于protoc的详细使用可以查看【Golang | gRPC】使用protoc编译.proto文件
protoc --go_out=./ --go-grpc_out=./ .\query.proto
2.2 服务端
在gRPC-Watch目录下新建Server文件夹,新建main.go文件
2.2.1 下面我们通过Query这个结构体具体实现QueryServer接口
var userinfo = map[string]int32{
"foo": 18,
"bar": 20,
}
// Query 结构体,实现QueryServer接口
type Query struct {
mu sync.Mutex
ch chan string
pb.UnimplementedQueryServer // 涉及版本兼容
}
func (q *Query) GetAge(ctx context.Context, info *pb.UserInfo) (*pb.AgeInfo, error) {
age := userinfo[info.GetName()]
var res = new(pb.AgeInfo)
res.Age = age
return res, nil
}
//Update用于更新用户年龄,通过sync.Mutex加锁,如果年龄有更新,则向chan发送对应的用户名
func (q *Query) Update(ctx context.Context, info *pb.UserInfo) (*emptypb.Empty, error) {
q.mu.Lock()
defer q.mu.Unlock()
name := info.GetName()
userinfo[name] += 1
if q.ch != nil {
q.ch <- name
}
return &emptypb.Empty{
}, nil
}
//Watch用于监听用户年龄状态的变化,先实例化一个chan,然后通过select方法监听chan内是否有数据,
//如果有则通过服务端流向客户端发送message,如果超过指定时间无更新,则退出
func (q *Query) Watch(timeSpecify *pb.WatchTime, serverStream pb.Query_WatchServer) error {
if q.ch != nil {
return errors.New("Watching is running, please stop first")
}
q.ch = make(chan string, 1)
for {
select {
case <-time.After(time.Second * time.Duration(timeSpecify.GetTime())):
close(q.ch)
q.ch = nil
return nil
case nameModify := <-q.ch:
log.Printf("The name of %s is updated\n", nameModify)
serverStream.Send(&pb.UserInfo{
Name: nameModify})
}
}
}
Update
用于更新用户年龄,通过sync.Mutex加锁,防止冲突;如果年龄有更新且watch功能开启,则向chan发送对应的用户名Watch
用于监听用户年龄状态的变化,先实例化一个chan,表示开启watch功能。然后通过select
方法监听chan内是否有数据,如果有则通过服务端流向客户端发送message,如果超过指定时间年龄无更新,则关闭watch功能并退出- 当Watch功能已经开启时,如果再次开启会返回报错
2.3 客户端
在gRPC-Watch目录下新建Client文件夹,新建main.go文件
func main() {
//建立无认证的连接
conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panic(err)
}
defer conn.Close()
client := pb.NewQueryClient(conn)
//RPC方法调用
ctx := context.Background()
//先获取更新前的年龄
age, _ := client.GetAge(ctx, &pb.UserInfo{
Name: "foo"})
log.Printf("Before updating, the age is %d\n", age.GetAge())
//更新年龄
log.Println("updating")
client.Update(ctx, &pb.UserInfo{
Name: "foo"})
//再获取更新后的年龄
age, _ = client.GetAge(ctx, &pb.UserInfo{
Name: "foo"})
log.Printf("After updating, the age is %d\n", age.GetAge())
}
2.4 Watch功能
在gRPC-Watch目录下新建Watch文件夹,新建main.go文件
func main() {
//建立无认证的连接
conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panic(err)
}
defer conn.Close()
client := pb.NewQueryClient(conn)
//RPC方法调用
stream, _ := client.Watch(context.Background(), &pb.WatchTime{
Time: 10})
for {
userInfoRecv, err := stream.Recv()
if err == io.EOF {
log.Println("end of watch")
break
} else if err != nil {
log.Println(err)
break
}
log.Printf("The name of %s is updated\n", userInfoRecv.GetName())
}
}
2.5 运行
首先开启服务端,然后开启Watch功能,在开启客户端,有如下输出结果(当指定时间内没有年龄更新,Watch自动退出):
当Watch已经开启,并再次开启时,会返回如下自定义的报错,对应2.2代码里的return errors.New("Watching is running, please stop first")
3. 总结
- Watch功能整体思路就是:服务端实例化一个chan,如果监听对象发生变化,向chan中发送值,客户端从chan中收到值,如果没有就一直阻塞
边栏推荐
- [游记]来自学长的馈赠2-2022.7.21
- 【php环境搭建/wamp/解释器/下载】
- JMeter test script learning (login script)
- Pikachu character injection for Day2 POC and exp learning
- LSA type, OSPF optimization and topology configuration
- Disadvantages of database indexing
- 软件兼容性测试有什么作用?兼容性测试必备测试工具
- Ansible项目最佳实践
- 振奋人心!元宇宙!下一代互联网的财富风口
- shell学习笔记(六)——实战一:脚本编写
猜你喜欢
网络空间资产测绘
A-LOAM源码阅读
Day04 禅道-从安装到卸载
免费下载!《Databricks数据洞察:从入门到实践》
【MySQL】20-MySQL如何创建和管理表超详细汇总
ECCV 2022 | 通过分析图像匹配解释深度伪造检测
Programming skills │ amazing syntax of advanced markdown editor
Hcip day 12
【FPGA教程案例34】通信案例4——基于FPGA的QPSK调制信号产生,通过matlab测试其星座图
How does redis like and cancel? (glory Collection Edition)
随机推荐
【oops-framework】全局事件
长安全新最强SUV来袭,内饰很立体,科技感爆棚
A frequency compensation method based on all phase FFT amplitude
基于JSP/SERVLET学生管理系统
Mapping of cyberspace assets
How to play a data mining competition Advanced Edition
Flutter实战-自定义键盘(二)
一个15年ABAP老兵的建议:了解这些基础知识,对ABAP开发有百利而无一害
From the 5K monthly salary of Tencent outsourcing to the 15K monthly salary of transferred regular employees, who can understand the 168 days of sadness
如何抓住元宇宙中机遇
lua基础
[游记]来自学长的馈赠2-2022.7.21
网络安全--ESP8266烧录、测试、擦除WiFi杀手(详细教程、附所有工具下载地址)
31. [memcpy function and strcpy function]
缺陷报告作业
SQL bypass case
一文读懂Elephant Swap的LaaS方案的优势之处
PKI体系快速了解
Data type conversion in MySQL 8 | learning functions cast() and convert()
Robust optimization of space-based relay in Internet of things based on UAV