一 : Stomp
HTTP处在应用层,而WebSocket处在TCP上,并且内容不多,是一个消息架构,不包含特定的解释协议,所以还得有专门的协议来解释消息,有很多,Stomp是其中之一.
stomp以帧来封装消息,一个帧由一个命令,加上header(可以是多个),再加上body(文本或二进制),组装出来的是一段字符串.
命令的类型:
CONNECT、SEND、SUBSCRIBE、UNSUBSCRIBE、BEGIN、COMMIT、ABORT、ACK、NACK、DISCONNECT
例如发送一个消息
SEND
destination:/queue/a
content-type:text/plain
hello queue a
^@
订阅消息
SUBSCRIBE
id:0
destination:/queue/foo
ack:client
^@
二 : SocketRocket
SocketRocket是Facebook维护的iOS和mac os 上的webSocket库,是OC实现的,是比较推荐的一个.
1.建立连接
Springboot基于STOMP实现的webSocket可以将http模拟成Socket,所以建立连接的url可能是一个"http://"
在iOS端SocketRocket库也可以支持STOMP.
pod 'SocketRocket'
var request = URLRequest.init(url: .init(string: "")!, cachePolicy: .useProtocolCachePolicy, timeoutInterval: 10)
//给request header添加一些key-value
request.setValue("", forHTTPHeaderField: "")
socket = SRWebSocket.init(urlRequest: request)
url可以是http(或者ws;wss)://域名(或者IP):端口/路径(/websocket)
最后可以加上"/websocket"强制使用websocket协议
例如http://test.com:9090/test/websocket
2.但是SocketRocket并没有实现Stomp协议的相关API,所以如果需要发送帧,就需要手动拼写frame;
就是命令 + 换行 + headerkey + : + headerValue + ... + 换行 + body + 终止字符
同样接受到的消息也是一个Frame,也需要解析
private func sendFrame(command: String?, header: [String: String]?, body: AnyObject?) {
var frameString = ""
if command != nil {
frameString = command! + "\n"
}
if let header = header {
for (key, value) in header {
frameString += key
frameString += ":"
frameString += value
frameString += "\n"
}
}
if let body = body as? String {
frameString += "\n"
frameString += body
} else if let _ = body as? NSData {
}
if body == nil {
frameString += "\n"
}
frameString += String(format: "%C", arguments: [0x00])
if socket?.readyState == .OPEN {
do{
try self.socket?.send(string: frameString)
}catch{}
}
}
三:StompKit
StompKit提供了封装和解析Frame的方法;以及CONNECT SUBSCRIBE ACK等命令的方法;
StompKit本身是基于CocoaAsyncSocket的,是纯OC的
1.预定义
#define kCommandAbort @"ABORT"
#define kCommandAck @"ACK"
#define kCommandBegin @"BEGIN"
#define kCommandCommit @"COMMIT"
#define kCommandConnect @"CONNECT"
#define kCommandConnected @"CONNECTED"
#define kCommandDisconnect @"DISCONNECT"
#define kCommandError @"ERROR"
#define kCommandMessage @"MESSAGE"
#define kCommandNack @"NACK"
#define kCommandReceipt @"RECEIPT"
#define kCommandSend @"SEND"
#define kCommandSubscribe @"SUBSCRIBE"
#define kCommandUnsubscribe @"UNSUBSCRIBE"
#pragma mark Control characters
#define kLineFeed @"\x0A"
#define kNullChar @"\x00"
#define kHeaderSeparator @":"
2.构造Frame
- (NSString *)toString {
NSMutableString *frame = [NSMutableString stringWithString: [self.command stringByAppendingString:kLineFeed]];
for (id key in self.headers) {
[frame appendString:[NSString stringWithFormat:@"%@%@%@%@", key, kHeaderSeparator, self.headers[key], kLineFeed]];
}
[frame appendString:kLineFeed];
if (self.body) {
[frame appendString:self.body];
}
[frame appendString:kNullChar];
return frame;
}
解析Frame
+ (STOMPFrame *) STOMPFrameFromData:(NSData *)data {
NSData *strData = [data subdataWithRange:NSMakeRange(0, [data length])];
NSString *msg = [[NSString alloc] initWithData:strData encoding:NSUTF8StringEncoding];
LogDebug(@"<<< %@", msg);
NSMutableArray *contents = (NSMutableArray *)[[msg componentsSeparatedByString:kLineFeed] mutableCopy];
while ([contents count] > 0 && [contents[0] isEqual:@""]) {
[contents removeObjectAtIndex:0];
}
NSString *command = [[contents objectAtIndex:0] copy];
NSMutableDictionary *headers = [[NSMutableDictionary alloc] init];
NSMutableString *body = [[NSMutableString alloc] init];
BOOL hasHeaders = NO;
[contents removeObjectAtIndex:0];
for(NSString *line in contents) {
if(hasHeaders) {
for (int i=0; i < [line length]; i++) {
unichar c = [line characterAtIndex:i];
if (c != '\x00') {
[body appendString:[NSString stringWithFormat:@"%c", c]];
}
}
} else {
if ([line isEqual:@""]) {
hasHeaders = YES;
} else {
NSMutableArray *parts = [NSMutableArray arrayWithArray:[line componentsSeparatedByString:kHeaderSeparator]];
// key ist the first part
NSString *key = parts[0];
[parts removeObjectAtIndex:0];
headers[key] = [parts componentsJoinedByString:kHeaderSeparator];
}
}
}
return [[STOMPFrame alloc] initWithCommand:command headers:headers body:body];
}
3.订阅的同时维护一个字典(subscriptions)来存储频道和收到消息的回调block
- (STOMPSubscription *)subscribeTo:(NSString *)destination
headers:(NSDictionary *)headers
messageHandler:(STOMPMessageHandler)handler {
NSMutableDictionary *subHeaders = [[NSMutableDictionary alloc] initWithDictionary:headers];
subHeaders[kHeaderDestination] = destination;
NSString *identifier = subHeaders[kHeaderID];
if (!identifier) {
identifier = [NSString stringWithFormat:@"sub-%d", idGenerator++];
subHeaders[kHeaderID] = identifier;
}
self.subscriptions[identifier] = handler;
[self sendFrameWithCommand:kCommandSubscribe
headers:subHeaders
body:nil];
return [[STOMPSubscription alloc] initWithClient:self identifier:identifier];
}
四 : WebsocketStompKit
WebsocketStompKit是用Jetfire为基础,然后再结合StompKit的思路来封装Frame,和connect,subscribe等操作
Jetfire还有一个swift版本叫starscream,不过Jetfire用的比较少
五 : StompClientLib
基于socketRocket然后封装了stomp协议相关操作的库,并且stomp部分是用swift实现的.
不过代码比较旧,而且个人认为有很多不太好的逻辑;
不过实质就是对STOMP协议的封装和解析,没有很多代码,这个库就一个文件;
所以建议直接放到项目里,根据实际业务直接魔改.
连接
var socketClient = StompClientLib()
let url = NSURL(string: "your-socket-url-is-here")!
socketClient.openSocketWithURLRequest(request: NSURLRequest(url: url as URL) , delegate: self)
订阅
let destination = "/topic/your_topic"
let ack = destination
let id = destination
let header = ["destination": destination, "ack": ack, "id": id]
// subscribe
socketClient?.subscribeWithHeader(destination: destination, withHeader: header)
// unsubscribe
socketClient?.unsubscribe(destination: subsId)
在实际使用的时候发现几个问题:
- func openSocket()方法判断了socketRocket在非.CLOSED的状态进入open();但是webSocket还有一个.CLOSING状态,如果做了多服务器支持,其中一台挂掉的时候,可能会长期处理这个状态,所以我也加上了;
另外现在iOS废弃了SSL免认证的API ,所以certificateCheckEnabled我也删了
private func openSocket() {
if socket == nil || socket?.readyState == .CLOSED || socket?.readyState == .CLOSING{
self.socket = SRWebSocket(urlRequest: urlRequest! as URLRequest)
socket!.delegate = self
socket!.open()
}
}
2.我觉着connection属性没有很好的发挥作用,我修改了下,在webSocketDidOpen()时设置为true;
在webSocket(_ webSocket: SRWebSocket, didCloseWithCode code: Int, reason: String?, wasClean: Bool) 设置为false;其他地方都不需要赋值;
3.在func closeSocket()中,self.socket!.close()之后,设置delegate=nil和socket=nil;
这个也是有有问题的,这样在主动断开连接之后,收不到func webSocket(_ webSocket: SRWebSocket, didCloseWithCode code: Int, reason: String?, wasClean: Bool)代理方法的回调;
所以我也修改了,结合上面的第2点.
private func closeSocket(){
if let delegate = delegate {
DispatchQueue.main.async(execute: {
delegate.stompClientDidDisconnect(client: self)
if self.socket != nil {
// Close the socket
self.socket!.close()
}
})
}
}
4.reconnectTimer在调用stopReconnect()之后还是在执行,我直接换成了DispatchSourceTimer
public func reconnect(request: NSURLRequest, delegate: StompClientLibDelegate, connectionHeaders: [String: String] = [String: String](), time: Double = 1.0, exponentialBackoff: Bool = true){
reconnectTimer = DispatchSource.makeTimerSource(flags: .strict, queue: .main)
reconnectTimer?.schedule(deadline: .now(), repeating: time)
reconnectTimer?.setEventHandler(handler: {
self.reconnectLogic(request: request, delegate: delegate
, connectionHeaders: connectionHeaders)
})
reconnectTimer?.resume()
}