前言 RPC简介 RPC(Remote Procedure Call)是一种使程序能够像调用本地函数一样调用远程服务的方法。它屏蔽了底层的通信细节,使得开发人员无需关注远程调用的复杂性,只需像操作本地方法一样调用远程方法 。
参考资料
项目仓库
项目概述 框架流程图
代码调用流程
三个主体
zookeeper服务端
zooKeeper在这里作为服务方法的管理配置中心,负责管理服务方法提供者对外提供的服务方法
Rpc服务端与客户端的身份都是zookeeper客户端
zookeeper存储服务对象与服务方法的方式:
Rpc服务端
Rpc服务端需要向zookeeper注册服务对象与服务方法,注册的内容是本机上提供该服务的ip+端口
注册完服务后,启动epoll监听客户端的远端调用请求
接收到Rpc客户端的远端调用后,先对调用参数进行反序列化,再调用本地方法处理该调用,最后将处理结果包装为响应,序列化后发出
Rpc客户端
Rpc客户端需要先从zookeeper中查询提供目标服务的Rpc服务端ip与端口
查询到Rpc服务端的ip与端口向目标端口发起连接,之后就是send - recv的流程,区别是send前需要对请求内容进行序列化,recv后需要对响应内容进行反序列化
Protobuf 定义了两个proto文件
该文件定义了RPC调用的元数据头部
1 2 3 4 5 6 7 8 syntax = "proto3" ; package Krpc;message RpcHeader { bytes service_name = 1 ; bytes method_name = 2 ; uint32 args_size = 3 ; }
service_name:用于标识目标服务(如 UserServiceRpc
)
method_name:用于标识目标方法(如 Login
)
args_size:表示后续参数数据的字节长度,避免粘包问题
服务端接收到请求后,通过service_name和method_name找到对应的服务
user.proto 该文件定义了UserServiceRpc
服务,以及它的两个方法Login
和Register
可以根据具体需求改动LoginRequest
和RegisterRequest
中的参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 syntax = "proto3" ; package Kuser;option cc_generic_services = true ;message ResultCode { int32 errcode = 1 ; bytes errmsg = 2 ; } message LoginRequest { bytes name = 1 ; bytes pwd = 2 ; } message LoginResponse { ResultCode result = 1 ; bool success = 2 ; } message RegisterRequest { uint32 id = 1 ; bytes name = 2 ; bytes pwd = 3 ; } message RegisterResponse { ResultCode result = 1 ; bool success = 2 ; } service UserServiceRpc { rpc Login(LoginRequest) returns (LoginResponse) ; rpc Register(RegisterRequest) returns (RegisterResponse) ; }
Login和Register
定义Login方法接收两个参数:name和pwd
定义Register方法接收三个参数:id、name、pwd
cc_generic_services 启用cc_generic_services
后,proto会生成两个C++类
UserServiceRpc
:callee需要继承此类并实现 Login
和 Register
方法
UserServiceRpc_Stub
:客户端存根类,由caller继承,通过 RpcChannel
发起调用
生成代码 调用以下命令生成proto文件对应的C++代码
1 protoc user.proto -I ./ --cpp_out=./user
生成的 user.h 和 user.cc 会被保存到 ./user 文件夹中
RPC服务端 服务端主文件Kserver.cpp 主文件由UserService和main函数两部分组成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #include <iostream> #include <string> #include "../user.pb.h" #include "KrpcProvider.h" #include "KrpcApplication.h" class UserService : public Kuser::UserServiceRpc{public : bool Login (std::string name, std::string pwd) { std::cout << "doing local service: Login" << std::endl; std::cout << "name=" << name << " pwd=" << pwd << std::endl; return true ; } void Login (::google::protobuf::RpcController* controller, const ::Kuser::LoginRequest* request, ::Kuser::LoginResponse* response, ::google::protobuf::Closure* done) { std::string name = request->name (); std::string pwd = request->pwd (); bool login_result = Login (name, pwd); Kuser::ResultCode *code = response->mutable_result (); code->set_errcode (0 ); code->set_errmsg ("" ); response->set_success (login_result); done->Run (); } }; int main (int argc, char ** argv) { KrpcApplication::Init (argc, argv); KrpcProvider provider; provider.NotifyService (new UserService ()); provider.Run (); return 0 ; }
UserService UserService继承自proto文件生成的UserServiceRpc,为具体服务,其中定义了服务中Login方法的两个具体实现:
一个版本的Login负责执行本地任务
一个版本的Login负责接收远端调用,之后调用重载版本执行本地任务,最后将执行结果写入响应,并阻塞等待回调
main main函数主要执行以下工作:
调用框架的初始化操作,从配置文件中读取必要的配置信息
将上面定义的UserService发布到Rpc节点上
启动Rpc服务发布节点,阻塞等待远端客户端调用该服务
为服务端提供服务的类 KrpcApplication KrpcApplication主要负责框架的初始化操作,有以下几个注意点:
KrpcApplication为一个单例类
初始化操作具体为:从命令行中解析出配置文件路径 → 调用KrpcConfig类载入配置文件,完成各参数的初始化
初始化的配置项包括zookeeper服务器的ip和端口,以及Rpc服务端自己的ip和端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 #pragma once #include "KrpcConfig.h" #include "KrpcChannel.h" #include "KrpcController.h" #include <mutex> class KrpcApplication {public : static void Init (int argc, char ** argv) ; static KrpcApplication& GetInstance () ; static void DeleteInstance () ; static KrpcConfig& GetConfig () ; private : KrpcApplication (){}; ~KrpcApplication (){}; KrpcApplication (const KrpcApplication&) = delete ; KrpcApplication (KrpcApplication&&) = delete ; private : static KrpcConfig m_config; static KrpcApplication* m_application; static std::mutex m_mutex; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 #include "KrpcApplication.h" #include <unistd.h> #include <iostream> KrpcConfig KrpcApplication::m_config; std::mutex KrpcApplication::m_mutex; KrpcApplication* KrpcApplication::m_application = nullptr ; void KrpcApplication::Init (int argc, char ** argv) { std::cout << "KrpcApplication::Init" << std::endl; if (argc < 2 ){ std::cout << "format::command -i <configfile>" << std::endl; exit (EXIT_FAILURE); } int opt; std::string config_file; while (-1 != (opt = getopt (argc, argv, "i:" ))){ switch (opt){ case 'i' :{ std::cout << "KrpcApplication: case -i" << std::endl; config_file = optarg; break ; } case '?' :{ std::cout << "format::command -i <configfile>" << std::endl; exit (EXIT_FAILURE); break ; } case ':' :{ std::cout << "format::command -i <configfile>" << std::endl; exit (EXIT_FAILURE); break ; } default : break ; } } m_config.LoadConfigFile (config_file.c_str ()); } KrpcApplication& KrpcApplication::GetInstance () { std::lock_guard<std::mutex> lock (m_mutex) ; if (!m_application){ m_application = new KrpcApplication (); atexit (DeleteInstance); } return *m_application; } void KrpcApplication::DeleteInstance () { if (m_application){ delete m_application; } } KrpcConfig& KrpcApplication::GetConfig () { return m_config; }
KrpcConfig KrpcConfig主要负责实际的配置项初始化操作,具体就是从配置文件中逐行读入配置项,逻辑比较简单
值得学习的点有使用智能指针管理文件指针,这样可以在初始化结束后自动关闭文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #pragma once #include <unordered_map> #include <string> class KrpcConfig {public : void LoadConfigFile (const char * config_file) ; std::string Load (const std::string& key) const ; private : void Trim (std::string& read_buf) ; private : std::unordered_map<std::string, std::string> m_configs; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include "KrpcConfig.h" #include <memory> void KrpcConfig::LoadConfigFile (const char * config_file) { std::unique_ptr<FILE, decltype (&fclose) > pf (fopen(config_file, "r" ), fclose) ; if (pf == nullptr ){ perror ("fopen" ); exit (EXIT_FAILURE); } char buf[1024 ]; while (fgets (buf, 1024 , pf.get ()) != nullptr ){ std::string read_buf (buf) ; Trim (read_buf); if (read_buf.empty () || read_buf[0 ] == '#' ){ continue ; } int ind = read_buf.find ('=' ); if (ind == -1 ){ continue ; } std::string key = read_buf.substr (0 , ind); Trim (key); int endInd = read_buf.find ('\n' , ind); std::string value = read_buf.substr (ind + 1 , endInd - ind - 1 ); Trim (value); m_configs[key] = value; } } std::string KrpcConfig::Load (const std::string& key) const { auto it = m_configs.find (key); if (it == m_configs.end ()){ return "" ; } return it->second; } void KrpcConfig::Trim (std::string& read_buf) { int ind = read_buf.find_first_not_of (' ' ); if (ind != -1 ){ read_buf = read_buf.substr (ind, read_buf.size () - ind); } ind = read_buf.find_last_not_of (' ' ); if (ind != -1 ){ read_buf = read_buf.substr (0 , ind + 1 ); } }
KrpcProvider Rpc服务端的核心函数类,提供发布Rpc方法、启动Rpc服务节点等功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 #pragma once #include "google/protobuf/service.h" #include "zookeeperutil.h" #include <muduo/net/TcpServer.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <muduo/net/TcpConnection.h> #include <google/protobuf/descriptor.h> #include <functional> #include <string> #include <unordered_map> class KrpcProvider {public : ~KrpcProvider (); void NotifyService (google::protobuf::Service* service) ; void Run () ; private : struct ServiceInfo { google::protobuf::Service* service; std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> method_map; }; void OnConnection (const muduo::net::TcpConnectionPtr& conn) ; void OnMessage (const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp receive_time) ; void SendRpcResponse (const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response) ; private : muduo::net::EventLoop m_eventloop; std::unordered_map<std::string, ServiceInfo> m_services; };
KrpcProvider如何管理服务对象与其方法?
使用unordered_map存放ServiceInfo,管理所有服务
每个ServiceInfo也使用一个unordered_map存放method描述符,管理该服务下的所有方法
析构函数 1 2 3 4 5 KrpcProvider::~KrpcProvider (){ std::cout << "~KrpcProvider()" << std::endl; m_eventloop.quit (); }
NotifyService NotifyService
提供给外部使用,用于在Rpc服务端上注册RPC服务 :
将传入的服务保存到m_services
中,等待之后调用Run时发布到zookeeper服务器上
多态思想的利用:
所有服务都继承自google::protobuf::Service
,所以NotifyService
能接受任何类型的服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void KrpcProvider::NotifyService (google::protobuf::Service* service) { ServiceInfo service_info; const google::protobuf::ServiceDescriptor* psd = service->GetDescriptor (); int method_cnt = psd->method_count (); for (int i = 0 ; i < method_cnt; ++i){ const google::protobuf::MethodDescriptor* pmd = psd->method (i); service_info.method_map.emplace (pmd->name (), pmd); } m_services.emplace (psd->name (), service_info); }
Run Run
的主要工作:
从配置文件中读取Rpc服务器的ip和端口
调用muduo库接口创建TcpServer对象,并分别绑定连接事件和消息事件,实现网络连接业务和消息处理业务的分离
将m_services
中注册的服务全部发布到zookeeper服务器上 ,让Rpc客户端可以从zookeeper上发现Rpc服务端提供的服务
所有服务都完成发布后,启动muduo库网络服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void KrpcProvider::Run () { std::string ip = KrpcApplication::GetInstance ().GetConfig ().Load ("rpcserverip" ); int port = atoi (KrpcApplication::GetInstance ().GetConfig ().Load ("rpcserverport" ).c_str ()); muduo::net::InetAddress address (ip, port) ; auto server = std::make_shared <muduo::net::TcpServer>(&m_eventloop, address, "KrpcProvider" ); server->setConnectionCallback (std::bind (&KrpcProvider::OnConnection, this , std::placeholders::_1)); server->setMessageCallback (std::bind (&KrpcProvider::OnMessage, this , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); server->setThreadNum (4 ); ZkClient zkclient; zkclient.Start (); std::cout << "KrpcProvider: zkclient Start success!" << std::endl; for (auto & service: m_services){ std::string service_path = "/" + service.first; std::cout << "KrpcProvider: zkclient Create znode: " << service_path << std::endl; zkclient.Create (service_path.c_str (), nullptr , 0 ); for (auto & method: service.second.method_map){ std::string method_path = service_path + "/" + method.first; char method_path_data[128 ] = {0 }; sprintf (method_path_data, "%s:%d" , ip.c_str (), port); zkclient.Create (method_path.c_str (), method_path_data, strlen (method_path_data), ZOO_EPHEMERAL); } } std::cout << "RpcProvider start service at ip:" << ip << ", port:" << port << std::endl; server->start (); m_eventloop.loop (); }
OnConnection 如果连接失效,则调用shutdown断开连接,除此不对连接事件做特殊处理
1 2 3 4 5 6 void KrpcProvider::OnConnection (const muduo::net::TcpConnectionPtr& conn) { if (!conn->connected ()){ conn->shutdown (); } }
OnMessage OnMessage
的主要工作:
处理Tcp粘包问题,反序列化从字节流中解析出参数
根据参数找到对应的服务与方法
生成Rpc方法调用的请求和响应,调用本地的方法,并通过回调函数发送响应
如何处理Tcp粘包问题?
将字节流分割为以下几部分:
header_size
: 固定4字节,记录header_str的长度
header_str
: 记录服务名、方法名、参数长度(KrpcHeader.proto中定义)
arg_str
: 用于调用方法的参数
什么是NewCallback
?
NewCallback
函数会返回一个google::protobuf::Closure类的对象,可以理解为定义了一个回调函数
Closure类对象相当于一个闭包,它捕获了以下内容:
一个成员对象的成员函数(这里为SendRpcResponse
)
以及这个成员函数需要的参数(这里为conn
、response
)
什么是CallMethod
?
CallMethod在UserServiceRpc
中实现(proto自动生成),功能为根据远端Rpc请求,调用当前Rpc节点上发布的方法
request
与response
中包含了调用method的参数,done
是执行完method后的回调函数,这里指定了SendRpcResponse
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 void KrpcProvider::OnMessage (const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp receive_time) { std::cout << "OnMessage" << std::endl; std::string recv_buf = buffer->retrieveAllAsString (); google::protobuf::io::ArrayInputStream raw_input (recv_buf.data(), recv_buf.size()) ; google::protobuf::io::CodedInputStream coded_input (&raw_input) ; uint32_t header_size{}; coded_input.ReadVarint32 (&header_size); std::string rpc_header_str; Krpc::RpcHeader krpcHeader; std::string service_name; std::string method_name; uint32_t args_size{}; google::protobuf::io::CodedInputStream::Limit msg_limit = coded_input.PushLimit (header_size); coded_input.ReadString (&rpc_header_str, header_size); coded_input.PopLimit (msg_limit); if (krpcHeader.ParseFromString (rpc_header_str)){ service_name = krpcHeader.service_name (); method_name = krpcHeader.method_name (); args_size = krpcHeader.args_size (); } else { std::cout << "Error: krpcHeader parse error" << std::endl; return ; } std::string args_str; if (!coded_input.ReadString (&args_str, args_size)){ std::cout << "Error: read args error" << std::endl; return ; } auto sit = m_services.find (service_name); if (sit == m_services.end ()){ std::cout << service_name << " is no exist!" << std::endl; return ; } auto mit = sit->second.method_map.find (method_name); if (mit == sit->second.method_map.end ()){ std::cout << method_name << " is no exist!" << std::endl; return ; } google::protobuf::Service* service = sit->second.service; const google::protobuf::MethodDescriptor* method = mit->second; google::protobuf::Message* request = service->GetRequestPrototype (method).New (); if (!request->ParseFromString (args_str)){ std::cout << service_name << '.' << method_name << " parse error!" << std::endl; return ; } google::protobuf::Message* response = service->GetResponsePrototype (method).New (); google::protobuf::Closure* done = google::protobuf::NewCallback <KrpcProvider, const muduo::net::TcpConnectionPtr&, google::protobuf::Message*>(this , &KrpcProvider::SendRpcResponse, conn, response); service->CallMethod (method, nullptr , request, response, done); }
SendRpcResponse SendRpcResponse作为OnMessage中CallMethod的回调函数,在执行完远端Rpc调用的方法后调用。其功能是序列化响应信息,并通过send发送给Rpc客户端
1 2 3 4 5 6 7 8 9 void KrpcProvider::SendRpcResponse (const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response) { std::string response_str; if (response->SerializeToString (&response_str)){ conn->send (response_str); } else { std::cout << "Serialize Error!" << std::endl; } }
zookeeperutil zookeeperutil
主要对zookeeper提供的一些api进行封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #pragma once #include <semaphore.h> #include <zookeeper/zookeeper.h> #include <string> class ZkClient {public : ZkClient (); ~ZkClient (); void Start () ; void Create (const char * path, const char * data, int datalen, int state = 0 ) ; std::string GetData (const char * path) ; private : zhandle_t * m_zhandle; };
global_watcher
watcher机制
global_watcher定义了一个全局的watcher观察器,当znode节点发生变化时,zk服务端会通过该回调函数通知zk客户端
这里的global_watcher只处理 type==ZOO_SESSION_EVENT && state==ZOO_CONNECTED_STATE 的watcher事件。目的是保证Start()调用完成后zk客户端(即Rpc服务端)与zk服务器的连接已经建立完成了
为什么需要watcher机制提供保证?
因为zk客户端与zk服务器的连接建立过程是异步的 。zookeeper_mt库的zookeeper客户端使用了以下三个线程:
主线程:用户调用API的线程。
IO线程:负责网络通信的线程。
completion线程:对于异步请求(Zookeeper中提供的异步API,一般都是以zoo_a开头的api)以及watcher的响应回调 ,io线程会发送给completion线程完成处理。
主线程在zk客户端调用api后返回zk句柄,而此时IO线程可能还没有完成连接的建立。所以需要watcher机制配合条件变量来保证Start()调用结束前连接的建立。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 #include "zookeeperutil.h" #include <mutex> #include <condition_variable> #include <iostream> #include "KrpcApplication.h" std::mutex cv_mutex; std::condition_variable cv; bool isConnected = false ;void global_watcher (zhandle_t * zh, int type, int state, const char * path, void * watcher_ctx) { if (type == ZOO_SESSION_EVENT){ if (state == ZOO_CONNECTED_STATE){ std::lock_guard<std::mutex> lock (cv_mutex) ; isConnected = true ; } } cv.notify_all (); } ZkClient::ZkClient (): m_zhandle (nullptr ){ } ZkClient::~ZkClient (){ if (m_zhandle != nullptr ){ zookeeper_close (m_zhandle); } } void ZkClient::Start () { std::string host = KrpcApplication::GetInstance ().GetConfig ().Load ("zookeeperip" ); std::string port = KrpcApplication::GetInstance ().GetConfig ().Load ("zookeeperport" ); std::string conn_str = host + port; m_zhandle = zookeeper_init (conn_str.c_str (), global_watcher, 6000 , nullptr , nullptr , 0 ); if (m_zhandle){ std::cout << "zookeeper_init error!" << std::endl; exit (EXIT_FAILURE); } std::unique_lock<std::mutex> lock (cv_mutex) ; cv.wait (lock, []{return isConnected;}); std::cout << "zookeeper_init success!" << std::endl; } void ZkClient::Create (const char * path, const char * data, int datalen, int state = 0 ) { char path_buffer[128 ]; int bufferlen = sizeof (path_buffer); int flag = zoo_exists (m_zhandle, path, 0 , nullptr ); if (flag == ZNONODE){ flag = zoo_create (m_zhandle, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen); if (flag == ZOK){ std::cout << "znode create success, path=" << path << std::endl; } else { std::cout << "znode create fail, path=" << path << std::endl; exit (EXIT_FAILURE); } } } std::string ZkClient::GetData (const char * path) { char buffer[64 ]; int bufferlen = sizeof (buffer); int flag = zoo_get (m_zhandle, path, 0 , buffer, &bufferlen, nullptr ); if (flag == ZOK){ return buffer; } else { std::cout << "zoo_get error!" << std::endl; return "" ; } return "" ; }
KrpcController KrpcController的主要作用是跟踪RPC方法调用的状态、错误信息并提供控制功能(如取消调用)。这里只实现了其最基本的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #pragma once #include <google/protobuf/service.h> #include <string> class KrpcController {public : KrpcController (); void Reset () ; bool Failed () const { return m_failed; } std::string ErrorText () const { m_errmsg; } void SetFailed (const std::string& reason) ; void StartCancel () ; bool IsCanceled () const ; void NotifyOnCancel (google::protobuf::Closure* callback) ; private : bool m_failed; std::string m_errmsg; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include "KrpcController.h" KrpcController::KrpcController () : m_failed (false ) , m_errmsg ("" ){ } void KrpcController::Reset () { m_failed = false ; m_errmsg = "" ; } void KrpcController::SetFailed (const std::string& reason) { m_failed = true ; m_errmsg = reason; } void KrpcController::StartCancel () {} bool KrpcController::IsCanceled () const { return false ; } void KrpcController::NotifyOnCancel (google::protobuf::Closure* callback) {}
RPC客户端 KrpcChannel
多态的应用
UserServiceRpc_Stub
的构造函数必须传入一个google::protobuf::RpcChannel
。所以我们必须自己实现一个KrpcChannel
继承自google::protobuf::RpcChannel
,并实现它的CallMethod
方法
CallMethod
如何理解?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 #pragma once #include <google/protobuf/service.h> #include "zookeeperutil.h" class KrpcChannel : public google::protobuf::RpcChannel{public : KrpcChannel (bool connectNow); virtual ~KrpcChannel (){}; void CallMethod (const ::google::protobuf::MethodDescriptor* method, ::google::protobuf::RpcController* controller, const ::google::protobuf::Message* request, ::google::protobuf::Message* response, ::google::protobuf::Closure* done) override ;private : bool newConnect (const char * ip, uint16_t port) ; std::string QueryServiceHost (ZkClient* zkclient, const std::string& service_name, const std::string& method_name, int & idx) ;private : int m_clientSock; std::string m_service_name; std::string m_method_name; std::string m_ip; uint16_t m_port; int m_idx; };
构造函数 如果已经处于连接状态(connectNow == True),则尝试与Rpc服务端进行连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 KrpcChannel::KrpcChannel (bool connectNow) : m_clientSock (-1 ) , m_idx (0 ){ if (!connectNow){ return ; } auto rt = newConnect (m_ip.c_str (), m_port); int cnt = 3 ; while (!rt && cnt--){ rt = newConnect (m_ip.c_str (), m_port); } }
CallMethod CallMethod的主要工作:
连接Rpc服务器 :查询zookeeper服务器获取ip和端口 → 调用newConnect
连接服务端
序列化请求:打包header_size、header_str、args_size、args_str
发送请求:send
接受响应:recv
解析响应数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 void KrpcChannel::CallMethod (const ::google::protobuf::MethodDescriptor* method, ::google::protobuf::RpcController* controller, const ::google::protobuf::Message* request, ::google::protobuf::Message* response, ::google::protobuf::Closure* done) { if (m_clientSock == -1 ){ const google::protobuf::ServiceDescriptor* sd = method->service (); m_service_name = sd->name (); m_method_name = method->name (); ZkClient zkCli; zkCli.Start (); std::string server_host = QueryServiceHost (&zkCli, m_service_name, m_method_name, m_idx); m_ip = server_host.substr (0 , m_idx); m_port = atoi (server_host.substr (m_idx + 1 , server_host.size () - m_idx).c_str ()); std::cout << "Server ip: " << m_ip << ", port: " << m_port << std::endl; bool rt = newConnect (m_ip.c_str (), m_port); if (rt) { std::cout << "connect server success" << std::endl; } else { std::cout << "connect server error" << std::endl; return ; } } Krpc::RpcHeader krpcHeader; krpcHeader.set_service_name (m_service_name); krpcHeader.set_method_name (m_method_name); uint32_t args_size{}; std::string args_str; if (request->SerializeToString (&args_str)){ args_size = args_str.size (); } else { controller->SetFailed ("serialize request fail" ); return ; } krpcHeader.set_args_size (args_size); std::string send_rpc_str; uint32_t header_size = 0 ; std::string rpc_header_str; if (krpcHeader.SerializeToString (&rpc_header_str)){ header_size = rpc_header_str.size (); } else { controller->SetFailed ("serialize rpc header fail" ); return ; } { google::protobuf::io::StringOutputStream string_output (&send_rpc_str) ; google::protobuf::io::CodedOutputStream coded_output (&string_output) ; coded_output.WriteVarint32 (static_cast <uint32_t >(header_size)); coded_output.WriteString (rpc_header_str); } send_rpc_str += args_str; if (send (m_clientSock, send_rpc_str.c_str (), send_rpc_str.size (), 0 ) == -1 ){ close (m_clientSock); perror ("send" ); controller->SetFailed ("send error" ); return ; } char recv_buf[1024 ] = {0 }; int recv_size = recv (m_clientSock, recv_buf, sizeof (recv_buf), 0 ); if (recv_size == -1 ){ perror ("recv" ); controller->SetFailed ("recv error" ); return ; } if (!response->ParseFromArray (recv_buf, recv_size)){ close (m_clientSock); perror ("parse" ); controller->SetFailed ("parse error" ); return ; } close (m_clientSock); }
newConnect 获得服务端的ip和port后,建立与Rpc服务端的TCP连接。常规的socket编程客户端connect流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 bool KrpcChannel::newConnect (const char * ip, uint16_t port) { int clientfd = socket (AF_INET, SOCK_STREAM, 0 ); if (clientfd == -1 ){ perror ("socket" ); return false ; } struct sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons (port); server_addr.sin_addr.s_addr = inet_addr (ip); if (connect (clientfd, (struct sockaddr*)&server_addr, sizeof (server_addr)) == -1 ){ perror ("KrpcChannel::newConnect: connect" ); return false ; } m_clientSock = clientfd; return true ; }
QueryServiceHost 向zookeeper服务器查询服务方法对应的Rpc服务端的host
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 std::string KrpcChannel::QueryServiceHost (ZkClient* zkclient, const std::string& service_name, const std::string& method_name, int & idx) { std::string method_path = '/' + service_name + '/' + method_name; std::unique_lock<std::mutex> lock (g_mutex) ; std::string server_host = zkclient->GetData (method_path.c_str ()); lock.unlock (); if (server_host == "" ){ std::cout << "ERROR: " << method_path << " is no exist! \n" ; return " " ; } idx = server_host.find (":" ); if (idx == -1 ){ std::cout << "ERROR: " << method_path << " address is invalid! \n" ; return " " ; } return server_host; }
客户端主函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 #include "KrpcApplication.h" #include "KrpcChannel.h" #include "../user.pb.h" #include <iostream> #include <vector> #include <atomic> #include <thread> #include <chrono> void send_request (int thread_id, std::atomic<int >& success_cnt, std::atomic<int >& fail_cnt) { Kuser::UserServiceRpc_Stub stub (new KrpcChannel(false )) ; Kuser::LoginRequest request; request.set_name ("yu" ); request.set_pwd ("123456" ); Kuser::LoginResponse response; KrpcController controller; stub.Login (&controller, &request, &response, nullptr ); if (controller.Failed){ std::cout << controller.ErrorText () << std::endl; } else { if (response.result ().errcode () == 0 ){ std::cout << "Rpc login response success: " << response.success () << std::endl; ++success_cnt; } else { std::cout << "Rpc login response success: " << response.result ().errmsg () << std::endl; ++fail_cnt; } } } int main (int argc, char ** argv) { KrpcApplication::Init (argc, argv); const int thread_cnt = 1000 ; const int request_per_thread = 10 ; std::vector<std::thread> threads; std::atomic<int > success_cnt (0 ) ; std::atomic<int > fail_cnt (0 ) ; auto start_time = std::chrono::high_resolution_clock::now (); for (int i = 0 ; i < thread_cnt; ++i){ threads.emplace_back ([argc, argv, i, &success_cnt, & fail_cnt](){ for (int j = 0 ; j < request_per_thread; ++j){ send_request (i, success_cnt, fail_cnt); } }); } for (auto & t : threads){ t.join (); } auto end_time = std::chrono::high_resolution_clock::now (); std::chrono::duration<double > elapsed = end_time - start_time; std::cout << "Total requests: " << thread_cnt * request_per_thread << std::endl; std::cout << "Success count: " << success_cnt << std::endl; std::cout << "Fail count: " << fail_cnt << std::endl; std::cout << "Elapsed time: " << elapsed.count () << " seconds" << std::end; std::cout << "QPS: " << (thread_cnt * request_per_thread) / elapsed.count () << std::endl; return 0 ; }
send_request 客户端通过send_request
远端调用Login
方法
main 进行Rpc框架的多线程并发测试
性能测试 分别运行服务端、客户端即可:
1 2 3 4 5 6 7 8 # 1.在example文件夹和src文件夹中为proto文件生成.h和.cc文件 # 2.回到项目根目录,使用以下命令编译生成可执行文件 mkdir build && cd build && cmake .. && make -j${nproc} # 3.进入bin文件夹,在不同会话中分别运行server和client ./server -i ./test.conf ./client -i ./test.conf
测试环境
测试平台:阿里云服务器
操作系统:Ubuntu 22.04
CPU:2核
内存:2G
硬盘:40G
测试结果
并发线程数200
,每个线程发送的请求数10
:
1 2 3 4 5 Total requests: 2000 Success count: 2000 Fail count: 0 Elapsed time: 10.4225 seconds QPS: 191.892
并发线程数1000
,每个线程发送的请求数10
:
1 2 3 4 5 Total requests: 10000 Success count: 10000 Fail count: 0 Elapsed time: 79.2326 seconds QPS: 126.21