前言

RPC简介

RPC(Remote Procedure Call)是一种使程序能够像调用本地函数一样调用远程服务的方法。它屏蔽了底层的通信细节,使得开发人员无需关注远程调用的复杂性,只需像操作本地方法一样调用远程方法

参考资料

项目仓库

项目概述

框架流程图

image.png

代码调用流程

image.png

三个主体

  • zookeeper服务端

    • zooKeeper在这里作为服务方法的管理配置中心,负责管理服务方法提供者对外提供的服务方法
    • Rpc服务端与客户端的身份都是zookeeper客户端
    • zookeeper存储服务对象与服务方法的方式:

    image.png

  • Rpc服务端

    • Rpc服务端需要向zookeeper注册服务对象与服务方法,注册的内容是本机上提供该服务的ip+端口
    • 注册完服务后,启动epoll监听客户端的远端调用请求
    • 接收到Rpc客户端的远端调用后,先对调用参数进行反序列化,再调用本地方法处理该调用,最后将处理结果包装为响应,序列化后发出
  • Rpc客户端

    • Rpc客户端需要先从zookeeper中查询提供目标服务的Rpc服务端ip与端口
    • 查询到Rpc服务端的ip与端口向目标端口发起连接,之后就是send - recv的流程,区别是send前需要对请求内容进行序列化,recv后需要对响应内容进行反序列化

Protobuf

定义了两个proto文件

Krpcheader.proto

该文件定义了RPC调用的元数据头部

1
2
3
4
5
6
7
8
syntax = "proto3";
package Krpc;

message RpcHeader {
bytes service_name = 1;
bytes method_name = 2;
uint32 args_size = 3;
}
  • service_name:用于标识目标服务(如 UserServiceRpc
  • method_name:用于标识目标方法(如 Login
  • args_size:表示后续参数数据的字节长度,避免粘包问题

服务端接收到请求后,通过service_name和method_name找到对应的服务

user.proto

该文件定义了UserServiceRpc服务,以及它的两个方法LoginRegister

可以根据具体需求改动LoginRequestRegisterRequest中的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
syntax = "proto3";
package Kuser;

// 控制生成基于protobuf的通用RPC服务基类
option cc_generic_services = true;

message ResultCode{
int32 errcode = 1;
bytes errmsg = 2;
}

message LoginRequest {
bytes name = 1;
bytes pwd = 2;
}

message LoginResponse {
ResultCode result = 1;
bool success = 2;
}

message RegisterRequest {
uint32 id = 1;
bytes name = 2;
bytes pwd = 3;
}

message RegisterResponse {
ResultCode result = 1;
bool success = 2;
}

service UserServiceRpc{
rpc Login(LoginRequest) returns(LoginResponse);
rpc Register(RegisterRequest) returns(RegisterResponse);
}

Login和Register

  • 定义Login方法接收两个参数:name和pwd
  • 定义Register方法接收三个参数:id、name、pwd

cc_generic_services

启用cc_generic_services后,proto会生成两个C++类

  • UserServiceRpc:callee需要继承此类并实现 Login 和 Register 方法
  • UserServiceRpc_Stub:客户端存根类,由caller继承,通过 RpcChannel 发起调用

生成代码

调用以下命令生成proto文件对应的C++代码

1
protoc user.proto -I ./ --cpp_out=./user

生成的 user.h 和 user.cc 会被保存到 ./user 文件夹中

RPC服务端

服务端主文件Kserver.cpp

主文件由UserService和main函数两部分组成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <iostream>
#include <string>
#include "../user.pb.h"
#include "KrpcProvider.h"
#include "KrpcApplication.h"


// 服务端上的自定义UserService,继承自proto生成的UserServiceRpc
class UserService: public Kuser::UserServiceRpc{
public:
// 这个版本执行Login的本地任务
bool Login(std::string name, std::string pwd){
std::cout << "doing local service: Login" << std::endl;
std::cout << "name=" << name << " pwd=" << pwd << std::endl;
return true;
}

// 重写UserServiceRpc的虚函数
// 这个版本的Login执行:接收远端调用 - 调用执行本地任务 - 写入响应 - 执行回调
void Login(::google::protobuf::RpcController* controller,
const ::Kuser::LoginRequest* request,
::Kuser::LoginResponse* response,
::google::protobuf::Closure* done){
// 接收远端调用的参数
std::string name = request->name();
std::string pwd = request->pwd();

// 调用重载版本,执行本地任务
bool login_result = Login(name, pwd);

// 写入响应,参考user.proto中的定义
Kuser::ResultCode *code = response->mutable_result();
code->set_errcode(0);
code->set_errmsg("");
response->set_success(login_result);

// 执行回调(执行响应对象数据的序列化和网络发送,交给框架来完成)
done->Run();
}
};


int main(int argc, char** argv){
// 调用框架的初始化操作
KrpcApplication::Init(argc, argv);

// provider将UserService发布到rpc节点上
KrpcProvider provider;
provider.NotifyService(new UserService());

// 启动rpc服务发布节点。Run以后进程进入阻塞状态,等待远程的rpc调用请求
provider.Run();

return 0;
}

UserService

UserService继承自proto文件生成的UserServiceRpc,为具体服务,其中定义了服务中Login方法的两个具体实现:

  • 一个版本的Login负责执行本地任务
  • 一个版本的Login负责接收远端调用,之后调用重载版本执行本地任务,最后将执行结果写入响应,并阻塞等待回调

main

main函数主要执行以下工作:

  1. 调用框架的初始化操作,从配置文件中读取必要的配置信息
  2. 将上面定义的UserService发布到Rpc节点上
  3. 启动Rpc服务发布节点,阻塞等待远端客户端调用该服务

为服务端提供服务的类

KrpcApplication

KrpcApplication主要负责框架的初始化操作,有以下几个注意点:

  • KrpcApplication为一个单例类
  • 初始化操作具体为:从命令行中解析出配置文件路径 → 调用KrpcConfig类载入配置文件,完成各参数的初始化
  • 初始化的配置项包括zookeeper服务器的ip和端口,以及Rpc服务端自己的ip和端口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// .h
#pragma once
#include "KrpcConfig.h"
#include "KrpcChannel.h"
#include "KrpcController.h"
#include <mutex>

// Krpc基础类,负责框架的初始化操作
class KrpcApplication{
public:
// 初始化框架
static void Init(int argc, char** argv);

// 单例模式,返回单例对象
static KrpcApplication& GetInstance();
// 删除单例
static void DeleteInstance();

// 获取配置
static KrpcConfig& GetConfig();

private:
// 单例模式,私有的构造函数
KrpcApplication(){};
~KrpcApplication(){};

// 删除拷贝与移动构造
KrpcApplication(const KrpcApplication&) = delete;
KrpcApplication(KrpcApplication&&) = delete;

private:
// 配置
static KrpcConfig m_config;

// 单例对象
static KrpcApplication* m_application;

// 互斥量
static std::mutex m_mutex;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// .cpp
#include "KrpcApplication.h"
#include <unistd.h>
#include <iostream>


// 再次声明静态变量
KrpcConfig KrpcApplication::m_config;
std::mutex KrpcApplication::m_mutex;
// 懒汉模式初始化单例对象
KrpcApplication* KrpcApplication::m_application = nullptr;


// 初始化框架
void KrpcApplication::Init(int argc, char** argv){
std::cout << "KrpcApplication::Init" << std::endl;
if(argc < 2){
// -i后必须有配置文件路径,文件中记录zookeeper服务器的ip和端口,以及服务器的ip和端口
std::cout << "format::command -i <configfile>" << std::endl;
exit(EXIT_FAILURE);
}

int opt;
std::string config_file;
// getopt用于解析命令行字符,第三个参数表示接收的参数,这里只指定i
while(-1 != (opt = getopt(argc, argv, "i:"))){
switch(opt){
case 'i':{
std::cout << "KrpcApplication: case -i" << std::endl;
// -i表示指定配置文件路径
// optarg为命令行参数对应的值,这里即为指定的配置文件路径
config_file = optarg;
break;
}
case '?':{
// 不接受i以外的命令行参数
std::cout << "format::command -i <configfile>" << std::endl;
exit(EXIT_FAILURE);
break;
}
case ':':{
// 出现了i但后面没有对应的值
std::cout << "format::command -i <configfile>" << std::endl;
exit(EXIT_FAILURE);
break;
}
default:
break;
}
}
// 从配置文件中载入配置项
m_config.LoadConfigFile(config_file.c_str());
}

// 单例模式,返回单例对象
KrpcApplication& KrpcApplication::GetInstance(){
// 获取单例时上锁
std::lock_guard<std::mutex> lock(m_mutex);

if(!m_application){
m_application = new KrpcApplication();
// 设置程序退出时自动销毁单例对象
atexit(DeleteInstance);
}

return *m_application;
}

// 删除单例
void KrpcApplication::DeleteInstance(){
if(m_application){
delete m_application;
}
}

// 获取配置
KrpcConfig& KrpcApplication::GetConfig(){
return m_config;
}

KrpcConfig

KrpcConfig主要负责实际的配置项初始化操作,具体就是从配置文件中逐行读入配置项,逻辑比较简单

值得学习的点有使用智能指针管理文件指针,这样可以在初始化结束后自动关闭文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// .h
#pragma once
#include <unordered_map>
#include <string>

class KrpcConfig{
public:
// 加载配置文件
void LoadConfigFile(const char* config_file);

// 查找配置项对应的值
std::string Load(const std::string& key) const;

private:
// 辅助函数,用于去除字符串前后的空格
void Trim(std::string& read_buf);

private:
// 存放配置项的容器
std::unordered_map<std::string, std::string> m_configs;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// .cpp
#include "KrpcConfig.h"
#include <memory>

// 加载配置文件
void KrpcConfig::LoadConfigFile(const char* config_file){
// 使用智能指针管理文件指针,并指定删除器为fclose
std::unique_ptr<FILE, decltype(&fclose)> pf(fopen(config_file, "r"), fclose);
if(pf == nullptr){
perror("fopen");
exit(EXIT_FAILURE);
}

// 逐行读取配置项
char buf[1024];
while(fgets(buf, 1024, pf.get()) != nullptr){
std::string read_buf(buf);
Trim(read_buf);
// 跳过空行与注释行
if(read_buf.empty() || read_buf[0] == '#'){
continue;
}

// '='前为配置名,后为配置值
int ind = read_buf.find('=');
if(ind == -1){
continue;
}
// 解析key
std::string key = read_buf.substr(0, ind);
Trim(key);
// 解析value
int endInd = read_buf.find('\n', ind);
std::string value = read_buf.substr(ind + 1, endInd - ind - 1);
Trim(value);

// 插入配置项
m_configs[key] = value;
}
}

// 查找配置项对应的值
std::string KrpcConfig::Load(const std::string& key) const {
auto it = m_configs.find(key);
if(it == m_configs.end()){
return "";
}
return it->second;
}

// 辅助函数,用于去除字符串前后的空格
void KrpcConfig::Trim(std::string& read_buf){
// 去除前导空格
int ind = read_buf.find_first_not_of(' ');
if(ind != -1){
read_buf = read_buf.substr(ind, read_buf.size() - ind);
}

// 去除尾部空格
ind = read_buf.find_last_not_of(' ');
if(ind != -1){
read_buf = read_buf.substr(0, ind + 1);
}
}

KrpcProvider

Rpc服务端的核心函数类,提供发布Rpc方法、启动Rpc服务节点等功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// .h
#pragma once
#include "google/protobuf/service.h"
#include "zookeeperutil.h"
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpConnection.h>
#include <google/protobuf/descriptor.h>
#include <functional>
#include <string>
#include <unordered_map>

class KrpcProvider{
public:
~KrpcProvider();

// 提供给外部使用,用于发布rpc方法
// 多态:所有服务都继承自google::protobuf::Service
void NotifyService(google::protobuf::Service* service);

// 启动Rpc服务节点,开始提供Rpc远程调用服务
void Run();

private:
// 服务结构体,用于保存具体的服务对象和它的方法
struct ServiceInfo{
google::protobuf::Service* service;
std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> method_map;
};

// 处理新连接
void OnConnection(const muduo::net::TcpConnectionPtr& conn);

// 处理已有连接上发来的消息
void OnMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp receive_time);

// 发送Rpc响应
void SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response);

private:
muduo::net::EventLoop m_eventloop;

// 保存服务对象的容器
std::unordered_map<std::string, ServiceInfo> m_services;
};

KrpcProvider如何管理服务对象与其方法?

  • 使用unordered_map存放ServiceInfo,管理所有服务
  • 每个ServiceInfo也使用一个unordered_map存放method描述符,管理该服务下的所有方法

析构函数

1
2
3
4
5
KrpcProvider::~KrpcProvider(){
// 析构函数,停止Event Loop
std::cout << "~KrpcProvider()" << std::endl;
m_eventloop.quit();
}

NotifyService

NotifyService提供给外部使用,用于在Rpc服务端上注册RPC服务

  • 将传入的服务保存到m_services中,等待之后调用Run时发布到zookeeper服务器上

多态思想的利用:

  • 所有服务都继承自google::protobuf::Service,所以NotifyService能接受任何类型的服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void KrpcProvider::NotifyService(google::protobuf::Service* service){
// 存放服务对象及其方法的结构体
ServiceInfo service_info;

// 利用多态返回服务类的描述信息
const google::protobuf::ServiceDescriptor* psd = service->GetDescriptor();

// 将服务对象的方法都存入service_info
int method_cnt = psd->method_count();
for(int i = 0; i < method_cnt; ++i){
const google::protobuf::MethodDescriptor* pmd = psd->method(i);
service_info.method_map.emplace(pmd->name(), pmd);
}

// 将服务对象放入容器进行管理
m_services.emplace(psd->name(), service_info);
}

Run

Run的主要工作:

  1. 从配置文件中读取Rpc服务器的ip和端口
  2. 调用muduo库接口创建TcpServer对象,并分别绑定连接事件和消息事件,实现网络连接业务和消息处理业务的分离
  3. m_services注册的服务全部发布到zookeeper服务器上,让Rpc客户端可以从zookeeper上发现Rpc服务端提供的服务
  4. 所有服务都完成发布后,启动muduo库网络服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void KrpcProvider::Run(){
// 从配置文件中读取Rpc服务端的ip和端口
std::string ip = KrpcApplication::GetInstance().GetConfig().Load("rpcserverip");
int port = atoi(KrpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
// 创建地址
muduo::net::InetAddress address(ip, port);

// 创建TcpServer对象
auto server = std::make_shared<muduo::net::TcpServer>(&m_eventloop, address, "KrpcProvider");

// 分别绑定连接事件和消息事件
server->setConnectionCallback(std::bind(&KrpcProvider::OnConnection, this, std::placeholders::_1));
server->setMessageCallback(std::bind(&KrpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

// 设置线程数量为4
server->setThreadNum(4);

// 把当前Rpc服务端的服务全部注册到zookeeper上,使得Rpc客户端能够从zookeeper上发现服务
ZkClient zkclient;
zkclient.Start();
std::cout << "KrpcProvider: zkclient Start success!" << std::endl;
// service_name为永久节点,method_name为临时节点
for(auto& service: m_services){
// service_name的路径: /service_name
std::string service_path = "/" + service.first;
std::cout << "KrpcProvider: zkclient Create znode: " << service_path << std::endl;
zkclient.Create(service_path.c_str(), nullptr, 0);

// 创建method_name节点
for(auto& method: service.second.method_map){
std::string method_path = service_path + "/" + method.first;
char method_path_data[128] = {0};
// 写入节点内容:ip + 端口
sprintf(method_path_data, "%s:%d", ip.c_str(), port);
// zookeeper上创建临时节点
zkclient.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}

// Rpc服务端准备启动
std::cout << "RpcProvider start service at ip:" << ip << ", port:" << port << std::endl;

// 启动muduo网络服务
server->start();
m_eventloop.loop();
}

OnConnection

如果连接失效,则调用shutdown断开连接,除此不对连接事件做特殊处理

1
2
3
4
5
6
void KrpcProvider::OnConnection(const muduo::net::TcpConnectionPtr& conn){
// 不做特别处理
if(!conn->connected()){
conn->shutdown();
}
}

OnMessage

OnMessage的主要工作:

  1. 处理Tcp粘包问题,反序列化从字节流中解析出参数
  2. 根据参数找到对应的服务与方法
  3. 生成Rpc方法调用的请求和响应,调用本地的方法,并通过回调函数发送响应

如何处理Tcp粘包问题?

将字节流分割为以下几部分:

  • header_size: 固定4字节,记录header_str的长度
  • header_str: 记录服务名、方法名、参数长度(KrpcHeader.proto中定义)
  • arg_str: 用于调用方法的参数

什么是NewCallback

NewCallback函数会返回一个google::protobuf::Closure类的对象,可以理解为定义了一个回调函数

Closure类对象相当于一个闭包,它捕获了以下内容:

  • 一个成员对象的成员函数(这里为SendRpcResponse
  • 以及这个成员函数需要的参数(这里为connresponse

什么是CallMethod

CallMethod在UserServiceRpc中实现(proto自动生成),功能为根据远端Rpc请求,调用当前Rpc节点上发布的方法

requestresponse中包含了调用method的参数,done是执行完method后的回调函数,这里指定了SendRpcResponse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
void KrpcProvider::OnMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp receive_time){
std::cout << "OnMessage" << std::endl;

// 接收远端调用的字节流
std::string recv_buf = buffer->retrieveAllAsString();
// ArrayInputStream: 将字节流包装为一个可读取的输入溜
google::protobuf::io::ArrayInputStream raw_input(recv_buf.data(), recv_buf.size());
// CodedInputStream: 提供高效的二进制流解析工具
google::protobuf::io::CodedInputStream coded_input(&raw_input);

// 读取4字节的header_size
uint32_t header_size{};
coded_input.ReadVarint32(&header_size);

// 根据header_size的值读取header_str,并对其反序列化,得到Rpc请求的详细信息(既服务名、方法名、参数大小)
std::string rpc_header_str;
Krpc::RpcHeader krpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size{};
// 设置读取规则,读取header_str
google::protobuf::io::CodedInputStream::Limit msg_limit = coded_input.PushLimit(header_size);
coded_input.ReadString(&rpc_header_str, header_size);
// 恢复规则,便于之后安全地读取其他数据
coded_input.PopLimit(msg_limit);
// 反序列化,解析KrpcHeader
if(krpcHeader.ParseFromString(rpc_header_str)){
service_name = krpcHeader.service_name();
method_name = krpcHeader.method_name();
args_size = krpcHeader.args_size();
} else {
std::cout << "Error: krpcHeader parse error" << std::endl;
return;
}

// 用于调用Rpc方法的参数
std::string args_str;
// 读取args_size长度的字符串
if(!coded_input.ReadString(&args_str, args_size)){
std::cout << "Error: read args error" << std::endl;
return;
}

// 在Rpc服务端中搜索service对象和method对象
auto sit = m_services.find(service_name);
if(sit == m_services.end()){
std::cout << service_name << " is no exist!" << std::endl;
return;
}
auto mit = sit->second.method_map.find(method_name);
if(mit == sit->second.method_map.end()){
std::cout << method_name << " is no exist!" << std::endl;
return;
}
// 获取服务对象与方法对象
google::protobuf::Service* service = sit->second.service;
const google::protobuf::MethodDescriptor* method = mit->second;

// 生成Rpc方法调用的请求(request)和响应(response)
// 通过 GetRequestPrototype,可以根据方法描述符动态获取对应的请求消息类型,并New()实例化该类型的对象
google::protobuf::Message* request = service->GetRequestPrototype(method).New();
if(!request->ParseFromString(args_str)){
std::cout << service_name << '.' << method_name << " parse error!" << std::endl;
return;
}
google::protobuf::Message* response = service->GetResponsePrototype(method).New();

// NewCallback函数会返回一个google::protobuf::Closure类的对象
// Closure类对象相当于一个闭包,它捕获了一个成员对象的成员函数(SendRpcResponse),以及这个成员函数需要的参数(conn、response))
google::protobuf::Closure* done = google::protobuf::NewCallback<KrpcProvider,
const muduo::net::TcpConnectionPtr&,
google::protobuf::Message*>(this, &KrpcProvider::SendRpcResponse,
conn,
response);

// CallMethod在UserServiceRpc实现,功能为根据远端Rpc请求,调用当前Rpc节点上发布的方法
// request与response中包含了调用method的参数,done是执行完method后的回调函数,这里指定了SendRpcResponse
service->CallMethod(method, nullptr, request, response, done);
}

SendRpcResponse

SendRpcResponse作为OnMessage中CallMethod的回调函数,在执行完远端Rpc调用的方法后调用。其功能是序列化响应信息,并通过send发送给Rpc客户端

1
2
3
4
5
6
7
8
9
void KrpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response){
// 序列化响应字符串,并将其发送给Rpc调用方
std::string response_str;
if(response->SerializeToString(&response_str)){
conn->send(response_str);
} else {
std::cout << "Serialize Error!" << std::endl;
}
}

zookeeperutil

zookeeperutil主要对zookeeper提供的一些api进行封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// .h
#pragma once
#include <semaphore.h>
#include <zookeeper/zookeeper.h>
#include <string>

// zookeeper客户端,主要封装一些zookeeper相关的api
class ZkClient{
public:
ZkClient();
~ZkClient();

// zk客户端启动,连接zk服务器。封装zookeeper_init
void Start();

// 在zk服务器中根据path新建一个节点。封装zoo_create
void Create(const char* path, const char* data, int datalen, int state = 0);

// 根据指定的路径,获取znode节点值。封装zoo_get
std::string GetData(const char* path);

private:
// zk客户端的句柄
zhandle_t* m_zhandle;
};

global_watcher

watcher机制

global_watcher定义了一个全局的watcher观察器,当znode节点发生变化时,zk服务端会通过该回调函数通知zk客户端

这里的global_watcher只处理 type==ZOO_SESSION_EVENT && state==ZOO_CONNECTED_STATE 的watcher事件。目的是保证Start()调用完成后zk客户端(即Rpc服务端)与zk服务器的连接已经建立完成了

为什么需要watcher机制提供保证?

因为zk客户端与zk服务器的连接建立过程是异步的。zookeeper_mt库的zookeeper客户端使用了以下三个线程:

  • 主线程:用户调用API的线程。
  • IO线程:负责网络通信的线程。
  • completion线程:对于异步请求(Zookeeper中提供的异步API,一般都是以zoo_a开头的api)以及watcher的响应回调,io线程会发送给completion线程完成处理。

主线程在zk客户端调用api后返回zk句柄,而此时IO线程可能还没有完成连接的建立。所以需要watcher机制配合条件变量来保证Start()调用结束前连接的建立。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// .cpp
#include "zookeeperutil.h"
#include <mutex>
#include <condition_variable>
#include <iostream>
#include "KrpcApplication.h"


// 全局锁
std::mutex cv_mutex;
// 条件变量
std::condition_variable cv;
// 标记zk客户端是否已经连接到zk服务器
bool isConnected = false;

// 全局的watcher观察器,当节点发生变化时,zk服务端会通过该回调函数通知zk客户端
void global_watcher(zhandle_t* zh, int type, int state, const char* path, void* watcher_ctx){
// 只处理 type==ZOO_SESSION_EVENT && state==ZOO_CONNECTED_STATE 的watcher事件
if(type == ZOO_SESSION_EVENT){
if(state == ZOO_CONNECTED_STATE){
std::lock_guard<std::mutex> lock(cv_mutex);
// 标记zk客户端已经与zk服务端建立连接
isConnected = true;
}
}
// 唤醒条件变量
cv.notify_all();
}

// 构造函数,初始化zk客户端句柄
ZkClient::ZkClient(): m_zhandle(nullptr){
}

// 析构函数,关闭zk客户端句柄
ZkClient::~ZkClient(){
if(m_zhandle != nullptr){
zookeeper_close(m_zhandle);
}
}

// zk客户端启动,连接zk服务器。封装zookeeper_init
void ZkClient::Start(){
// 从配置文件中获取zookeeper服务器的ip和端口
std::string host = KrpcApplication::GetInstance().GetConfig().Load("zookeeperip");
std::string port = KrpcApplication::GetInstance().GetConfig().Load("zookeeperport");
// 拼接 ip + port
std::string conn_str = host + port;

// 初始化zk对象,异步建立zk客户端(即Rpc服务端)与zk服务器的连接
m_zhandle = zookeeper_init(conn_str.c_str(), global_watcher, 6000, nullptr, nullptr, 0);
if(m_zhandle){
// 初始化失败
std::cout << "zookeeper_init error!" << std::endl;
exit(EXIT_FAILURE);
}

// 等待global_watcher回调通知连接已经建立完成(isConnected == true)
std::unique_lock<std::mutex> lock(cv_mutex);
// 第二个参数用于防止虚假唤醒
cv.wait(lock, []{return isConnected;});
std::cout << "zookeeper_init success!" << std::endl;
}

// 在zk服务器中根据path新建一个节点。封装zoo_create
void ZkClient::Create(const char* path, const char* data, int datalen, int state = 0){
// 创建znode节点,可以选择永久性节点还是临时性节点
char path_buffer[128];
int bufferlen = sizeof(path_buffer);

// 检查指定的节点是否存在,只有不存在时才创建节点
int flag = zoo_exists(m_zhandle, path, 0, nullptr);
if(flag == ZNONODE){
// 创建指定path的znode节点
flag = zoo_create(m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
if(flag == ZOK){
std::cout << "znode create success, path=" << path << std::endl;
} else {
std::cout << "znode create fail, path=" << path << std::endl;
exit(EXIT_FAILURE);
}
}
}

// 根据指定的路径,获取znode节点值。封装zoo_get
std::string ZkClient::GetData(const char* path){
char buffer[64];
int bufferlen = sizeof(buffer);
int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
if(flag == ZOK){
return buffer;
} else {
std::cout << "zoo_get error!" << std::endl;
return "";
}

return "";
}

KrpcController

KrpcController的主要作用是跟踪RPC方法调用的状态、错误信息并提供控制功能(如取消调用)。这里只实现了其最基本的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// .h
#pragma once
#include <google/protobuf/service.h>
#include <string>

// 描述Rpc调用的控制器,主要作用是跟踪RPC方法调用的状态、错误信息并提供控制功能(如取消调用)
class KrpcController{
public:
KrpcController();

// 重置状态与错误信息
void Reset();

// 返回当前状态是否为错误
bool Failed() const { return m_failed; }

// 返回错误信息
std::string ErrorText() const { m_errmsg; }

// 设置状态与错误信息
void SetFailed(const std::string& reason);

// 下面是一些还未实现的功能
void StartCancel();
bool IsCanceled() const;
void NotifyOnCancel(google::protobuf::Closure* callback);

private:
// RPC方法执行过程中的状态
bool m_failed;

// RPC方法执行过程中的错误信息
std::string m_errmsg;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// .cpp
#include "KrpcController.h"

KrpcController::KrpcController()
: m_failed(false)
, m_errmsg(""){
}

// 重置状态与错误信息
void KrpcController::Reset(){
m_failed = false;
m_errmsg = "";
}

// 设置状态与错误信息
void KrpcController::SetFailed(const std::string& reason){
m_failed = true;
m_errmsg = reason;
}

// 下面是一些还未实现的功能
void KrpcController::StartCancel(){

}
bool KrpcController::IsCanceled() const {
return false;
}
void KrpcController::NotifyOnCancel(google::protobuf::Closure* callback){

}

RPC客户端

KrpcChannel

多态的应用

UserServiceRpc_Stub的构造函数必须传入一个google::protobuf::RpcChannel。所以我们必须自己实现一个KrpcChannel继承自google::protobuf::RpcChannel ,并实现它的CallMethod方法

CallMethod如何理解?

  • 客户端视角CallMethod 是客户端存根(Stub)类调用的入口,负责将本地方法调用(如这里的Login)的参数序列化为网络传输格式,并通过网络发送给服务端(简单理解就是将本地方法调用转换为远程过程调用

    • 客户端所有服务方法的调用最终都会转变为对CallMethod的调用,如Login:

    UserServiceRpc_stub::Login源码

  • 服务端视角:服务端通过 CallMethod 接收请求后,根据消息头中的服务名和方法名路由到具体的服务实现,并触发本地方法执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// .h
#pragma once
#include <google/protobuf/service.h>
#include "zookeeperutil.h"


// 继承自google::protobuf::RpcChannel
// 目的是为了给客户端进行方法调用的时候,统一接收的
class KrpcChannel: public google::protobuf::RpcChannel{
public:
// 构造函数
KrpcChannel(bool connectNow);

// 析构函数
virtual ~KrpcChannel(){};

// 重写继承的CallMethod
void CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) override;

private:
// 建立与Rpc服务端的连接
bool newConnect(const char* ip, uint16_t port);

// 向zookeeper服务器查询服务方法对应的Rpc服务端ip和端口
std::string QueryServiceHost(ZkClient* zkclient, const std::string& service_name,
const std::string& method_name, int& idx);

private:
// 客户端通信的socket
int m_clientSock;
// 服务名
std::string m_service_name;
// 方法名
std::string m_method_name;
// Rpc服务端的ip和端口
std::string m_ip;
uint16_t m_port;
// 划分服务器ip和端口的下标
int m_idx;
};

构造函数

如果已经处于连接状态(connectNow == True),则尝试与Rpc服务端进行连接

  • 重试机制:当连接失败时,可以重试3次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
KrpcChannel::KrpcChannel(bool connectNow)
: m_clientSock(-1)
, m_idx(0){
if(!connectNow){
return;
}

// 尝试与Rpc服务端进行连接,可以重试3次
auto rt = newConnect(m_ip.c_str(), m_port);
int cnt = 3;
while(!rt && cnt--){
rt = newConnect(m_ip.c_str(), m_port);
}
}

CallMethod

CallMethod的主要工作:

  1. 连接Rpc服务器 :查询zookeeper服务器获取ip和端口 → 调用newConnect连接服务端
  2. 序列化请求:打包header_size、header_str、args_size、args_str
  3. 发送请求:send
  4. 接受响应:recv
  5. 解析响应数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
void KrpcChannel::CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done){
// 建立与Rpc服务端的连接
if(m_clientSock == -1){
// 获取服务对象名和方法名
const google::protobuf::ServiceDescriptor* sd = method->service();
m_service_name = sd->name();
m_method_name = method->name();

// 向zookeeper服务器查询服务对象和方法对应的服务端host
ZkClient zkCli;
zkCli.Start();
std::string server_host = QueryServiceHost(&zkCli, m_service_name, m_method_name, m_idx);

// 解析host
m_ip = server_host.substr(0, m_idx);
m_port = atoi(server_host.substr(m_idx + 1, server_host.size() - m_idx).c_str());
std::cout << "Server ip: " << m_ip << ", port: " << m_port << std::endl;

// 建立连接
bool rt = newConnect(m_ip.c_str(), m_port);
if(rt) {
std::cout << "connect server success" << std::endl;
} else {
std::cout << "connect server error" << std::endl;
// 连接建立失败时返回
return;
}
}

// 封装KrpcHeader
Krpc::RpcHeader krpcHeader;
// - service_name
krpcHeader.set_service_name(m_service_name);
// - method_name
krpcHeader.set_method_name(m_method_name);
// - args_size
uint32_t args_size{};
std::string args_str;
// 序列化参数到字符串
if(request->SerializeToString(&args_str)){
args_size = args_str.size();
} else {
// 设置KrpcController的错误信息
controller->SetFailed("serialize request fail");
return;
}
krpcHeader.set_args_size(args_size);

// 将(header_size、header_str、args_size、args_str)打包到send_rpc_str
std::string send_rpc_str;
uint32_t header_size = 0;
std::string rpc_header_str;
// 序列化KrpcHeader到字符串
if(krpcHeader.SerializeToString(&rpc_header_str)){
header_size = rpc_header_str.size();
} else {
controller->SetFailed("serialize rpc header fail");
return;
}

// 流式写入send_rpc_str
{
google::protobuf::io::StringOutputStream string_output(&send_rpc_str);
google::protobuf::io::CodedOutputStream coded_output(&string_output);
coded_output.WriteVarint32(static_cast<uint32_t>(header_size));
coded_output.WriteString(rpc_header_str);
}
send_rpc_str += args_str;

// 发送Rpc请求
if(send(m_clientSock, send_rpc_str.c_str(), send_rpc_str.size(), 0) == -1){
// 发送请求失败
close(m_clientSock);
perror("send");
controller->SetFailed("send error");
return;
}

// 接受Rpc请求的响应
char recv_buf[1024] = {0};
int recv_size = recv(m_clientSock, recv_buf, sizeof(recv_buf), 0);
if(recv_size == -1){
perror("recv");
controller->SetFailed("recv error");
return;
}

// 反序列化解析响应数据
if(!response->ParseFromArray(recv_buf, recv_size)){
// 解析失败
close(m_clientSock);
perror("parse");
controller->SetFailed("parse error");
return;
}

// 请求-响应完毕,关闭连接
close(m_clientSock);
}

newConnect

获得服务端的ip和port后,建立与Rpc服务端的TCP连接。常规的socket编程客户端connect流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bool KrpcChannel::newConnect(const char* ip, uint16_t port){
// socket编程的客户端connect流程
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if(clientfd == -1){
perror("socket");
return false;
}

struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip);

if(connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1){
perror("KrpcChannel::newConnect: connect");
return false;
}

m_clientSock = clientfd;
return true;
}

QueryServiceHost

向zookeeper服务器查询服务方法对应的Rpc服务端的host

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
std::string KrpcChannel::QueryServiceHost(ZkClient* zkclient, const std::string& service_name, 
const std::string& method_name, int& idx){
std::string method_path = '/' + service_name + '/' + method_name;

// 上锁从zookeeper获取Rpc服务器host,保证多线程情况下每一个线程都能拿到信息
std::unique_lock<std::mutex> lock(g_mutex);
std::string server_host = zkclient->GetData(method_path.c_str());
lock.unlock();

// 判断host合法性
if(server_host == ""){
std::cout << "ERROR: " << method_path << " is no exist! \n";
// 不能返回空字符串,否则后续substr会出错
return " ";
}
// 以":"为分隔符分隔ip和port
idx = server_host.find(":");
if(idx == -1){
std::cout << "ERROR: " << method_path << " address is invalid! \n";
return " ";
}
return server_host;
}

客户端主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include "KrpcApplication.h"
#include "KrpcChannel.h"
#include "../user.pb.h"
#include <iostream>
#include <vector>
#include <atomic>
#include <thread>
#include <chrono>


// Rpc客户端远端调用Rpc服务端的服务方法
void send_request(int thread_id, std::atomic<int>& success_cnt, std::atomic<int>& fail_cnt){
Kuser::UserServiceRpc_Stub stub(new KrpcChannel(false));

// 设置Rpc请求参数
Kuser::LoginRequest request;
request.set_name("yu");
request.set_pwd("123456");
// 创建Rpc响应参数
Kuser::LoginResponse response;

// 远程调用Login方法
KrpcController controller;
// 这里Login实际就是通过KrpcChannel::CallMethod间接调用的
stub.Login(&controller, &request, &response, nullptr);
// 读取响应
if(controller.Failed){
// 调用失败
std::cout << controller.ErrorText() << std::endl;
} else {
if(response.result().errcode() == 0){
std::cout << "Rpc login response success: " << response.success() << std::endl;
++success_cnt;
} else {
std::cout << "Rpc login response success: " << response.result().errmsg() << std::endl;
++fail_cnt;
}
}
}


int main(int argc, char** argv){
// 程序启动后,调用KrpcApplication类进行初始化
KrpcApplication::Init(argc, argv);

// 设置线程配置属性
const int thread_cnt = 1000; // 并发线程数
const int request_per_thread = 10; // 每个线程发送的请求数

std::vector<std::thread> threads;
std::atomic<int> success_cnt(0);
std::atomic<int> fail_cnt(0);

// 获取程序开始时间
auto start_time = std::chrono::high_resolution_clock::now();

// 启动多线程并发测试
for(int i = 0; i < thread_cnt; ++i){
threads.emplace_back([argc, argv, i, &success_cnt, & fail_cnt](){
for(int j = 0; j < request_per_thread; ++j){
send_request(i, success_cnt, fail_cnt);
}
});
}

// 阻塞等待所有线程执行完成
for(auto& t : threads){
t.join();
}

// 获取程序结束时间,并计算程序执行时间
auto end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;

// 输出统计结果
std::cout << "Total requests: " << thread_cnt * request_per_thread << std::endl;
std::cout << "Success count: " << success_cnt << std::endl;
std::cout << "Fail count: " << fail_cnt << std::endl;
std::cout << "Elapsed time: " << elapsed.count() << " seconds" << std::end;
std::cout << "QPS: " << (thread_cnt * request_per_thread) / elapsed.count() << std::endl;

return 0;
}

send_request

客户端通过send_request 远端调用Login方法

main

进行Rpc框架的多线程并发测试

性能测试

分别运行服务端、客户端即可:

1
2
3
4
5
6
7
8
# 1.在example文件夹和src文件夹中为proto文件生成.h和.cc文件

# 2.回到项目根目录,使用以下命令编译生成可执行文件
mkdir build && cd build && cmake .. && make -j${nproc}

# 3.进入bin文件夹,在不同会话中分别运行server和client
./server -i ./test.conf
./client -i ./test.conf

测试环境

  • 测试平台:阿里云服务器
  • 操作系统:Ubuntu 22.04
  • CPU:2核
  • 内存:2G
  • 硬盘:40G

测试结果

  • 并发线程数200,每个线程发送的请求数10

    1
    2
    3
    4
    5
    Total requests: 2000
    Success count: 2000
    Fail count: 0
    Elapsed time: 10.4225 seconds
    QPS: 191.892
  • 并发线程数1000,每个线程发送的请求数10

    1
    2
    3
    4
    5
    Total requests: 10000
    Success count: 10000
    Fail count: 0
    Elapsed time: 79.2326 seconds
    QPS: 126.21