Customer是通信对象,每个Customer都与某个node id相绑定,代表当前节点发送到对应node id节点。Customer对象维护request和response的状态,其中tracker_成员记录每个请求可能发送给了多少节点以及从多少个节点返回。tracker_ 下标为每个req标识的timestamp。Customer也会启动一个receiving thread,它接受的消息来自于Van的receiving thread,即每个节点的Van对象收到message后,根据message的不同,推送到不同的customer对象中。
作为发送方,它追踪每个发送请求的响应,它具有自己的接收线程,能够处理来自远程节点的一些消息,但这些消息目的地必须与Customer的ID相同;
它具有消息处理函数,以及消息优先级队列, 接受Van接收的消息;它对每个消息(必须指定接收方)产生一个时间戳,发送消息后等待直至消息处理完成;
/**
* Copyright (c) 2015 by Contributors
*/
#ifndef PS_INTERNAL_CUSTOMER_H_
#define PS_INTERNAL_CUSTOMER_H_
#include <mutex>
#include <vector>
#include <utility>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <thread>
#include <memory>
#include "ps/internal/message.h"
#include "ps/internal/threadsafe_pqueue.h"
namespace ps {
/**
* 通信对象,作为发送方,它追踪每个发送请求的响应,具有自己的接收线程,
* 能够处理从远程节点接收的一切消息,这些远程节点的消息的msg.meta.customer_id
* 与Customer具有相同ID。
*/
/**
* \brief The object for communication.
*
* As a sender, a customer tracks the responses for each request sent.
*
* It has its own receiving thread which is able to process any message received
* from a remote node with `msg.meta.customer_id` equal to this customer's id
*/
class Customer {
public:
/** 所接收消息的处理函数 */
/**
* \brief the handle for a received message
* \param recved the received message
*/
using RecvHandle = std::function<void(const Message& recved)>;
/**
* app_id 全局唯一ID,表示postoffice所服务的应用
* customer_id 本地唯一ID,表示postoffice的Customer
* recv_handle 处理接收消息的函数
*/
/**
* \brief constructor
* \param app_id the globally unique id indicating the application the postoffice
* serving for
* \param customer_id the locally unique id indicating the customer of a postoffice
* \param recv_handle the functino for processing a received message
*/
Customer(int app_id, int customer_id, const RecvHandle& recv_handle);
/**
* \brief desconstructor
*/
~Customer();
/**
* \brief return the globally unique application id
*/
inline int app_id() { return app_id_; }
/**
* \brief return the locally unique customer id
*/
inline int customer_id() { return customer_id_; }
/**
* 线程安全,对新的请求获取时间戳
* recver 请求的接收节点
*/
/**
* \brief get a timestamp for a new request. threadsafe
* \param recver the receive node id of this request
* \return the timestamp of this request
*/
int NewRequest(int recver);
/**
* 线程安全,等待直到请求完成
* timestamp 请求的时间戳
*/
/**
* \brief wait until the request is finished. threadsafe
* \param timestamp the timestamp of the request
*/
void WaitRequest(int timestamp);
/**
* 线程安全,返回请求响应的数量
* timestamp 请求的时间戳
*/
/**
* \brief return the number of responses received for the request. threadsafe
* \param timestamp the timestamp of the request
*/
int NumResponse(int timestamp);
/** 增加响应的数量至时间戳 */
/**
* \brief add a number of responses to timestamp
*/
void AddResponse(int timestamp, int num = 1);
/**
* 线层安全,接受来自从Van的消息
* recved 接收的消息
*/
/**
* \brief accept a received message from \ref Van. threadsafe
* \param recved the received the message
*/
inline void Accept(const Message& recved) {
recv_queue_.Push(recved);
}
private:
/** 线程运行函数 */
/**
* \brief the thread function
*/
void Receiving();
/** 应用ID */
int app_id_;
/** 客户ID */
int customer_id_;
/** 接收处理函数 */
RecvHandle recv_handle_;
/** 线程安全的优先级队列 */
ThreadsafePQueue recv_queue_;
/** 接收线程 */
std::unique_ptr<std::thread> recv_thread_;
/** 跟踪的互斥锁 */
std::mutex tracker_mu_;
/** 追踪的信号量 */
std::condition_variable tracker_cond_;
/** 追踪 */
std::vector<std::pair<int, int>> tracker_;
DISALLOW_COPY_AND_ASSIGN(Customer);
};
} // namespace ps
#endif // PS_INTERNAL_CUSTOMER_H_