首先需要提到的是我们使用grpc的传输协议是http/2.0版本
他是一种双向流控制协议
其中的核心包含stream、frame、message其中message是最小的传输单元
他还实现了对于message层面的权重控制,可以让一些重要的资源优先传输
相当于写一份协议文件,可以在各种语言编译出对应语言的interface,在去实现这个interface即可
名称通常以*.proto结尾命名
首先我们需要编写proto文件结尾的地方我们可以以stream 来声明这是一个客户端流传输协议还是服务端流传输协议还是双向流传输
注意这里的枚举类型需要以0开始作为标识符
其中的10000-19999是grpc的预留标识符
syntax = 'proto3';
message Work {
enum Operation {
ADD = 0;
SUBTRACT = 1;
}
int32 num1 = 1;
int32 num2 = 2;
Operation op = 3;
}
message Result {
float result = 1;
}
message City {
string name = 1;
}
message Subject {
string name = 1;
}
message Delta {
int32 val = 1;
}
message Sum {
int32 val = 2;
}
message Number {
int32 val = 1;
}
message Answer {
int32 val = 1;
string desc = 2;
}
service Demo {
// unary rpc
rpc Calculate (Work) returns (Result) {
}
// SERVER streaming rpc 根据城市获取小区、
rpc GetCommunity (City) returns (stream Subject) {
}
//client streaming rpc 累加获取结果
rpc Accumulate (stream Delta) returns (Sum) {
}
//双向 猜数字大小
rpc GuessNumber (stream Number) returns (stream Answer) {
}
}
编写好了之后使用 python -m grpc_tools.protoc -I. --python_out=.. --grpc_python_out=.. req.proto
命令来生成对应的server serverstub clientstub 以及所有proto中的message消息体
注意其中参数I必选,表示查询后续参数的路径 这里我们用.来表示当前路径
然后开始编写server端
import grpc_demo.req_pb2_grpc as PService
import grpc_demo.req_pb2 as Mess
import grpc
from concurrent import futures
# 实现被调用的具体代码
class DemoService(PService.DemoServicer):
def __init__(self):
self.city_db = {
"beijing": [' python ', 'c++', 'go'],
"shanghai": [' 产品 ', '123', '数学'],
"wuhan": [' 语文 ', '英语', '数学']
}
self.answers = list(range(10))
def Calculate(self, request, context):
if request.num1 == 0:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details("cannot divide by 0")
return Mess.Result()
if request.op == Mess.Work.ADD:
result = request.num1 + request.num2
elif request.op == Mess.Work.SUBTRACT:
result = request.num1 - request.num2
else:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details("invalid op parameter")
return Mess.Result()
return Mess.Result(result=result)
pass
def GetCommunity(self, request, context):
city = request.name
subs = self.city_db.get(city)
for sub in subs:
import time
time.sleep(1)
yield Mess.Subject(name=sub)
def Accumulate(self, request_iterator, context):
sum = 0
for num in request_iterator:
sum += num.val
print(sum)
return Mess.Sum(val=sum)
def GuessNumber(self, request_iterator, context):
for request in request_iterator:
if request.val in self.answers:
print(request.val)
yield Mess.Answer(val=request.val, desc='bingo')
# 开启服务器
def serve():
# 创建服务器-对象 多线程服务器
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# 注册实现的服务方法到服务器对象中
PService.add_DemoServicer_to_server(DemoService(), server)
# 设置服务地址
server.add_insecure_port('127.0.0.1:9999')
# 开启服务
print('start server')
server.start()
# 关闭服务
import time
try:
time.sleep(1000)
except KeyboardInterrupt:
server.stop(0)
print("stop server")
pass
if __name__ == '__main__':
serve()
这里我们需要从生成的demo文件中获取到demoserver来继承,然后逐一实现对应的方法,如果没有实现proto中定义的方法则会引发notImplement的异常
需要注意的是 如果定义的方法时服务端流 ,则在返回的时候需要使用yield关键字来返回
如果定义的是客户端流,则在接受参数时接收到的request是request_initiator迭代器
如果需要引发一个异常我们不能使用raise,而是使用
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details("cannot divide by 0")
return Mess.Result()
最终一定要返回一个空的消息体
有时候我们不知道返回的状态码该如何定义则可以直接使用grpc给我们提供的状态码在grpc.StatusCode
模块中