/**
* Copyright (c) 2015 by Contributors
*/
#ifndef PS_INTERNAL_MESSAGE_H_
#define PS_INTERNAL_MESSAGE_H_
#include <vector>
#include <limits>
#include <string>
#include <sstream>
#include "ps/sarray.h"
namespace ps {
/** \brief data type */
enum DataType {
CHAR, INT8, INT16, INT32, INT64,
UINT8, UINT16, UINT32, UINT64,
FLOAT, DOUBLE, OTHER
};
/** \brief data type name */
static const char* DataTypeName[] = {
"CHAR", "INT8", "INT16", "INT32", "INT64",
"UINT8", "UINT16", "UINT32", "UINT64",
"FLOAT", "DOUBLE", "OTHER"
};
/**
* \brief compare if V and W are the same type
*/
template<typename V, typename W>
inline bool SameType() {
return std::is_same<typename std::remove_cv<V>::type, W>::value;
}
/**
* \brief return the DataType of V
*/
template<typename V>
DataType GetDataType() {
if (SameType<V, int8_t>()) {
return INT8;
} else if (SameType<V, int16_t>()) {
return INT16;
} else if (SameType<V, int32_t>()) {
return INT32;
} else if (SameType<V, int64_t>()) {
return INT64;
} else if (SameType<V, uint8_t>()) {
return UINT8;
} else if (SameType<V, uint16_t>()) {
return UINT16;
} else if (SameType<V, uint32_t>()) {
return UINT32;
} else if (SameType<V, uint64_t>()) {
return UINT64;
} else if (SameType<V, float>()) {
return FLOAT;
} else if (SameType<V, double>()) {
return DOUBLE;
} else {
return OTHER;
}
}
/** 分布式节点类 */
/**
* \brief information about a node
*/
struct Node {
/** 空ID默认值,用于判断是否空节点 */
/** \brief the empty value */
static const int kEmpty;
/** \brief default constructor */
Node() : id(kEmpty), port(kEmpty), is_recovery(false) {}
/** 节点角色:服务器、工作者、调度者 */
/** \brief node roles */
enum Role { SERVER, WORKER, SCHEDULER };
/** 调试信息 */
/** \brief get debug string */
std::string DebugString() const {
std::stringstream ss;
ss << "role=" << (role == SERVER ? "server" : (role == WORKER ? "worker" : "scheduler"))
<< (id != kEmpty ? ", id=" + std::to_string(id) : "")
<< ", ip=" << hostname << ", port=" << port << ", is_recovery=" << is_recovery;
return ss.str();
}
/** 简洁调试信息 */
/** \brief get short debug string */
std::string ShortDebugString() const {
std::string str = role == SERVER ? "S" : (role == WORKER ? "W" : "H");
if (id != kEmpty) str += "[" + std::to_string(id) + "]";
return str;
}
/** 节点角色 */
/** \brief the role of this node */
Role role;
/** 节点编号 */
/** \brief node id */
int id;
/** 客户编号 */
/** \brief customer id */
int customer_id;
/** 主机名或IP */
/** \brief hostname or ip */
std::string hostname;
/** 端口号 */
/** \brief the port this node is binding */
int port;
/** 节点失败恢复重建标记 */
/** \brief whether this node is created by failover */
bool is_recovery;
};
/** 系统控制消息的元信息类 */
/**
* \brief meta info of a system control message
*/
struct Control {
/** \brief empty constructor */
Control() : cmd(EMPTY) { }
/** 空命令 */
/** \brief return true is empty */
inline bool empty() const { return cmd == EMPTY; }
/** 调试信息 */
/** \brief get debug string */
std::string DebugString() const {
if (empty()) return "";
std::vector<std::string> cmds = {
"EMPTY", "TERMINATE", "ADD_NODE", "BARRIER", "ACK", "HEARTBEAT"};
std::stringstream ss;
ss << "cmd=" << cmds[cmd];
if (node.size()) {
ss << ", node={";
for (const Node& n : node) ss << " " << n.DebugString();
ss << " }";
}
if (cmd == BARRIER) ss << ", barrier_group=" << barrier_group;
if (cmd == ACK) ss << ", msg_sig=" << msg_sig;
return ss.str();
}
/** 支持的所有控制命令 */
/** \brief all commands */
enum Command { EMPTY, TERMINATE, ADD_NODE, BARRIER, ACK, HEARTBEAT };
/** 控制命令 */
/** \brief the command */
Command cmd;
/** 节点信息 */
/** \brief node infos */
std::vector<Node> node;
/** 所属节点组 */
/** \brief the node group for a barrier, such as kWorkerGroup */
int barrier_group;
/** 消息签名 */
/** message signature */
uint64_t msg_sig;
};
/** 消息的元信息 */
/**
* \brief meta info of a message
*/
struct Meta {
/** 空标志 */
/** \brief the empty value */
static const int kEmpty;
/** \brief default constructor */
Meta() : head(kEmpty), app_id(kEmpty), customer_id(kEmpty),
timestamp(kEmpty), sender(kEmpty), recver(kEmpty),
request(false), push(false), pull(false), simple_app(false) {}
/** 调试信息 */
std::string DebugString() const {
std::stringstream ss;
if (sender == Node::kEmpty) {
ss << "?";
} else {
ss << sender;
}
ss << " => " << recver;
ss << ". Meta: request=" << request;
if (timestamp != kEmpty) ss << ", timestamp=" << timestamp;
if (!control.empty()) {
ss << ", control={ " << control.DebugString() << " }";
} else {
ss << ", app_id=" << app_id
<< ", customer_id=" << customer_id
<< ", simple_app=" << simple_app
<< ", push=" << push;
}
if (head != kEmpty) ss << ", head=" << head;
if (body.size()) ss << ", body=" << body;
if (data_type.size()) {
ss << ", data_type={";
for (auto d : data_type) ss << " " << DataTypeName[static_cast<int>(d)];
ss << " }";
}
return ss.str();
}
/** 头信息 */
/** \brief an int head */
int head;
/** 消息应用方的唯一ID */
/** \brief the unique id of the application of messsage is for*/
int app_id;
/** 客户ID */
/** \brief customer id*/
int customer_id;
/** 消息的时间戳 */
/** \brief the timestamp of this message */
int timestamp;
/** 消息发送方的节点ID */
/** \brief the node id of the sender of this message */
int sender;
/** 消息接收方的节点ID */
/** \brief the node id of the receiver of this message */
int recver;
/** 是否是请求消息 */
/** \brief whether or not this is a request message*/
bool request;
/** 是否属于推送消息 */
/** \brief whether or not a push message */
bool push;
/** 是否属于拉取消息 */
/** \brief whether or not a pull message */
bool pull;
/** 标志 */
/** \brief whether or not it's for SimpleApp */
bool simple_app;
/** 消息内容 */
/** \brief an string body */
std::string body;
/** 消息中数据内容的数据类型 */
/** \brief data type of message.data[i] */
std::vector<DataType> data_type;
/** 控制信息 */
/** \brief system control message */
Control control;
/** 数据内容大小 */
/** \brief the byte size */
int data_size = 0;
/** 消息优先级 */
/** \brief message priority */
int priority = 0;
};
/** 节点之间通信的消息类 */
/**
* \brief messages that communicated amaong nodes.
*/
struct Message {
/** 消息元数据:描述消息数据域的类型、大小信息以及消息自身相关信息 */
/** \brief the meta info of this message */
Meta meta;
/** 消息数据域 */
/** \brief the large chunk of data of this message */
std::vector<SArray<char> > data;
/** 填充消息:数组字节化后填入数据域,并记录数据类型、大小到消息元数据中 */
/**
* \brief push array into data, and add the data type
*/
template <typename V>
void AddData(const SArray<V>& val) {
CHECK_EQ(data.size(), meta.data_type.size());
meta.data_type.push_back(GetDataType<V>());
SArray<char> bytes(val);
meta.data_size += bytes.size();
data.push_back(bytes);
}
/** 调试信息 */
std::string DebugString() const {
std::stringstream ss;
ss << meta.DebugString();
if (data.size()) {
ss << " Body:";
for (const auto& d : data) ss << " data_size=" << d.size();
}
return ss.str();
}
};
} // namespace ps
#endif // PS_INTERNAL_MESSAGE_H_
ps-lite源码分析: include/ps/internal/message.h
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- /*** Copyright (c) 2015 by Contributors*/#ifndef PS_INTE...
- /*** Copyright (c) 2015 by Contributors*/#ifndef PS_BASE...