当前位置:网站首页>[golang | grpc] use grpc to realize the watch function
[golang | grpc] use grpc to realize the watch function
2022-07-22 12:37:00 【Field potato】
Environmental Science :
Golang: go1.18.2 windows/amd64
gRPC: v1.47.0
Protobuf: v1.28.0
Complete code :
https://github.com/WanshanTian/GolangLearning
cd GolangLearning/RPC/gRPC-Watch
1. brief introduction
The client can use the Watch
Mechanism to subscribe to the data or state of a node on the server , When it changes, you can receive the corresponding notice .
I studied a while ago gRPC
Service end stream mode , Today we will use this flow pattern to realize Watch function
2. practice
There is one scenario below : The server stores the user's age information , The client sends message You can get the age of the corresponding user or update the age of the corresponding user ( Age +1); adopt Watch The function can monitor the user's age status in real time , When the age of a user changes , The client receives notification
2.1 proto file
2.1.1 newly build gRPC-Watch Folder , Use go mod init initialization , establish pb Folder , newly build query.proto file
syntax = "proto3";
package pb;
option go_package= ".;pb";
import "google/protobuf/empty.proto";
// Define the methods contained in the query service
service Query {
rpc GetAge (userInfo) returns (ageInfo) {};
rpc Update (userInfo) returns (google.protobuf.Empty) {};
rpc Watch (watchTime) returns (stream userInfo){}
}
// Request structure , Contains a name Field
message userInfo {
string name = 1;
}
// Respond to the structure of the application , Contains a age Field
message ageInfo {
int32 age = 1;
}
// watch Time for
message watchTime{
int32 time = 1;
}
GetAge
andUpdate
Methods are used to obtain age and update age respectively , Simple RPC The wayWatch
Method is used to monitor changes in age status , Adopt the service end flow mode- When gRPC The method of does not require a request message Or no response is required message when , You can start with
import "google/protobuf/empty.proto"
, Then use it directlygoogle.protobuf.Empty
2.1.2 stay .\gRPC-Watch\pb Use... In the directory protoc Tools to compile , stay pb Generate directly under the folder .pb.go and _grpc.pb.go file . About protoc You can view the detailed use of 【Golang | gRPC】 Use protoc compile .proto file
protoc --go_out=./ --go-grpc_out=./ .\query.proto
2.2 Server side
stay gRPC-Watch New under the directory Server Folder , newly build main.go file
2.2.1 So let's go through Query This structure is specifically implemented QueryServer Interface
var userinfo = map[string]int32{
"foo": 18,
"bar": 20,
}
// Query Structure , Realization QueryServer Interface
type Query struct {
mu sync.Mutex
ch chan string
pb.UnimplementedQueryServer // Version compatibility involved
}
func (q *Query) GetAge(ctx context.Context, info *pb.UserInfo) (*pb.AgeInfo, error) {
age := userinfo[info.GetName()]
var res = new(pb.AgeInfo)
res.Age = age
return res, nil
}
//Update Used to update user age , adopt sync.Mutex Lock , If the age is updated , to chan Send the corresponding user name
func (q *Query) Update(ctx context.Context, info *pb.UserInfo) (*emptypb.Empty, error) {
q.mu.Lock()
defer q.mu.Unlock()
name := info.GetName()
userinfo[name] += 1
if q.ch != nil {
q.ch <- name
}
return &emptypb.Empty{
}, nil
}
//Watch Used to monitor the change of user's age status , Instantiate a chan, And then through select Method monitor chan Whether there is data in ,
// If yes, send it to the client through the server message, If there is no update after the specified time , The exit
func (q *Query) Watch(timeSpecify *pb.WatchTime, serverStream pb.Query_WatchServer) error {
if q.ch != nil {
return errors.New("Watching is running, please stop first")
}
q.ch = make(chan string, 1)
for {
select {
case <-time.After(time.Second * time.Duration(timeSpecify.GetTime())):
close(q.ch)
q.ch = nil
return nil
case nameModify := <-q.ch:
log.Printf("The name of %s is updated\n", nameModify)
serverStream.Send(&pb.UserInfo{
Name: nameModify})
}
}
}
Update
Used to update user age , adopt sync.Mutex Lock , To prevent conflict ; If the age is updated and watch Function of open , to chan Send the corresponding user nameWatch
Used to monitor the change of user's age status , Instantiate a chan, Open for indication watch function . And then throughselect
Method monitor chan Whether there is data in , If yes, send it to the client through the server message, If the age is not updated beyond the specified time , Then close watch Function and exit- When Watch When the function has been turned on , If it is opened again, an error will be returned
2.3 client
stay gRPC-Watch New under the directory Client Folder , newly build main.go file
func main() {
// Establish a connection without authentication
conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panic(err)
}
defer conn.Close()
client := pb.NewQueryClient(conn)
//RPC Method call
ctx := context.Background()
// Get the age before the update first
age, _ := client.GetAge(ctx, &pb.UserInfo{
Name: "foo"})
log.Printf("Before updating, the age is %d\n", age.GetAge())
// Update age
log.Println("updating")
client.Update(ctx, &pb.UserInfo{
Name: "foo"})
// Get the updated age
age, _ = client.GetAge(ctx, &pb.UserInfo{
Name: "foo"})
log.Printf("After updating, the age is %d\n", age.GetAge())
}
2.4 Watch function
stay gRPC-Watch New under the directory Watch Folder , newly build main.go file
func main() {
// Establish a connection without authentication
conn, err := grpc.Dial(":1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panic(err)
}
defer conn.Close()
client := pb.NewQueryClient(conn)
//RPC Method call
stream, _ := client.Watch(context.Background(), &pb.WatchTime{
Time: 10})
for {
userInfoRecv, err := stream.Recv()
if err == io.EOF {
log.Println("end of watch")
break
} else if err != nil {
log.Println(err)
break
}
log.Printf("The name of %s is updated\n", userInfoRecv.GetName())
}
}
2.5 function
Start the server first , Then open Watch function , When opening the client , There are the following output results ( When there is no age update within the specified time ,Watch Automatically quit ):
When Watch Already open , And turn it on again , The following customized error messages will be returned , Corresponding 2.2 In code return errors.New("Watching is running, please stop first")
3. summary
- Watch The overall idea of the function is : The server instantiates a chan, If the listening object changes , towards chan Send value in , The client from chan Received value in , If not, it will keep blocking
边栏推荐
- 元宇宙的一种似曾相识的发展逻辑,完全已经进入到了互联网的怪圈之中
- ECCV 2022 | interpret depth forgery detection by analyzing image matching
- A 15-year-old ABAP veteran's suggestion: understanding these basic knowledge is beneficial to ABAP development
- 直流无刷电机工作原理
- 【php环境搭建/wamp/解释器/下载】
- 一个15年ABAP老兵的建议:了解这些基础知识,对ABAP开发有百利而无一害
- 分布式系统认证方案探究
- 长安全新最强SUV来袭,内饰很立体,科技感爆棚
- Pikachu character injection for Day2 POC and exp learning
- [oops framework] time management
猜你喜欢
Several reasons why the chrome developer tool network shows the provision headers are shown
YLearn因果学习开源项目「贡献者计划」精彩来袭!
Oauth2.0 four authorization modes
openGauss数据库在CentOS上的安装实践
The Sandbox 与 TOWER 团队共同发起塔防游戏创作大赛
Taishan Office Technology Lecture: which layer should the paragraph border be handled
SQL bypass case
【php环境搭建/wamp/解释器/下载】
Audio and video (III) - Deep Foundation Excavation
[译] Swift 编译器性能
随机推荐
Shell learning notes (VI) -- practical battle I: Script Writing
互联网企业怎样保证自研系统的安全
Network security -- esp8266 burn, test, erase WiFi killer (detailed tutorial, with download address of all tools)
网络安全--ESP8266烧录、测试、擦除WiFi杀手(详细教程、附所有工具下载地址)
Swift compiler performance
Research on authentication scheme of distributed system
Cdh5, cdh6 Deployment Guide (stable)
攻防演习和传统的渗透测试有哪些差别
[optional: run your application on mobile device]
Detailed analysis process of lapsus stealing Microsoft Bing source code
Use of Photoshop mask
photoshop 蒙版的使用
spark学习笔记(入门)——sparkcore实现wordcount的三种方式
How to use proxy IP address
如何给selenium.chrome写扩展拦截或转发请求
多功能嵌入式解码软件(1)
前缀和&差分
The Sandbox 与 TOWER 团队共同发起塔防游戏创作大赛
免费下载!《Databricks数据洞察:从入门到实践》
[oops framework] log management