以中间件,路由,跨进程事件的姿势使用WebSocket--Node.js篇

上一篇文章介绍了在浏览器端以中间件,路由,跨进程事件的姿势使用原生WebSocket。这篇文章将介绍如何使用Node.js以相同的编程模式来实现WebSocket服务端。

Node.js中比较流行的两个WebSocket库分别是socket.iows。其中socket.io已经实现了跨进程事件,广播,群发等功能,并且服务端与浏览器端是配套的,在不支持WebSocket技术的浏览器会降级为使用ajax轮询。所以。这里选择使用相对而言较为底层或原始的ws,在其基础上实现文章标题所提到的编程模式。

WS

使用ws简简单单就可以启动一个WebSocket服务:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

上面的wss支持的事件有connection,close,error,headers,listening,ws支持的事件有message,close,error。更多详情可以这里

中间件

对ws进行封装,上面提到的事件:WSS的connection,ws的message,close,error。分别提供注册中间件的接口

class EasySocket {
    constructor() {
        this.connectionMiddleware = [];
        this.closeMiddleware = [];
        this.messageMiddleware = [];
        this.errorMiddleware = [];

        this.connectionFn = Promise.resolve();
        this.closeFn = Promise.resolve();
        this.messageFn = Promise.resolve();
        this.errorFn = Promise.resolve();
    }
    connectionUse(fn, runtime) {
        this.connectionMiddleware.push(fn);
        if (runtime) {
            this.connectionFn = compose(this.connectionMiddleware);
        }
        return this;
    }
    closeUse(fn, runtime) {
        this.closeMiddleware.push(fn);
        if (runtime) {
            this.closeFn = compose(this.closeMiddleware);
        }
        return this;
    }
    messageUse(fn, runtime) {
        this.messageMiddleware.push(fn);
        if (runtime) {
            this.messageFn = compose(this.messageMiddleware);
        }
        return this;
    }
    errorUse(fn, runtime) {
        this.errorMiddleware.push(fn);
        if (runtime) {
            this.errorFn = compose(this.errorMiddleware);
        }
        return this;
    }
    
}

通过xxxUse注册相应的中间件。 xxxMiddleware中就是相应的中间件。xxxFn是中间件通过compose处理后的结构。使用runtime参数可以在运行时注册中间件。

再添加一个listen方法,处理相应的中间件并且实例化WebSocket.Server

listen(config) {
        this.socket = new WebSocket.Server(config);
        this.connectionFn = compose(this.connectionMiddleware);
        this.messageFn = compose(this.messageMiddleware);
        this.closeFn = compose(this.closeMiddleware);
        this.errorFn = compose(this.errorMiddleware);
        this.socket.on('connection', (client, req) => {
            let context = { server: this, client, req };
            this.connectionFn(context).catch(error => { console.log(error) });

            client.on('message', (message) => {
                let req;
                try {
                    req = JSON.parse(message);
                } catch (error) {
                    req = message;
                }
                let messageContext = { server: this, client, req }
                this.messageFn(messageContext).catch(error => { console.log(error) })
            });

            client.on('close', (code, message) => {
                let closeContext = { server: this, client, code, message };
                this.closeFn(closeContext).catch(error => { console.log(error) })
            });

            client.on('error', (error) => {
                let errorContext = { server: this, client, error };
                this.errorFn(errorContext).catch(error => { console.log(error) })
            });
        })
    }

使用koa-compose模块处理中间件。注意xxContext传入了哪些东西,后续定义中间件的时候都可以使用。

compose的作用可看这篇文章 傻瓜式解读koa中间件处理模块koa-compose

使用:

import EasySocket from 'easy-socket-node';

const config = {
    port: 3001,
    perMessageDeflate: {
        zlibDeflateOptions: { // See zlib defaults.
            chunkSize: 1024,
            memLevel: 7,
            level: 3,
        },
        zlibInflateOptions: {
            chunkSize: 10 * 1024
        },
        // Other options settable:
        clientNoContextTakeover: true, // Defaults to negotiated value.
        serverNoContextTakeover: true, // Defaults to negotiated value.
        //clientMaxWindowBits: 10,       // Defaults to negotiated value.
        serverMaxWindowBits: 10,       // Defaults to negotiated value.
        // Below options specified as default values.
        concurrencyLimit: 10,          // Limits zlib concurrency for perf.
        threshold: 1024,               // Size (in bytes) below which messages
        // should not be compressed.
    }
}
const easySocket = new EasySocket();
//使用中间件获取token
easySocket
    .connectionUse((context,next)=>{
       console.log("new Connected");
       let location = url.parse(context.req.url, true);
       let token=location.query.token;
       if(!token){
           client.send("invalid token");
           client.close(1003, "invalid token");
           return;
       }
       context.client.token=token;
       next();
    });
easySocket
    .listen(config)

console.log('Now start WebSocket server on port ' + config.port + '...')

使用messageUse可以注册多个处理消息的中间件,比如

 easySocket.messageUse((context, next) => {
    //群聊处理中间件
    if (context.req.action === 'roomChatMessage') {
      //可以在这里持久化消息,将消息发送给其它群聊客户端
      console.log(context.req);
    }
    next();
  })
  .messageUse((context, next) => {
    //私聊处理中间件
    if (context.req.action === 'privateChatMessage') {
      //可以在这里持久化消息,将消息发送给私聊客户端
      console.log(context.req);
    }
    next();
  })

每个中间件都要判断context.req.action,而这个context.res就是浏览器端或客户端发送的数据。怎么消除这个频繁的if判断呢? 我们实现一个简单的消息处理路由。

路由

定义消息路由中间件

messageRouteMiddleware.js

export default (routes) => {
    return async (context, next) => {
        if (routes[context.req.action]) {
            await routes[context.req.action](context,next);
        } else {
            console.log(context.req)
            next();
        }
    }
}

定义路由

router.js

export default {
    roomChatMessage:function(context,next){
        //可以在这里持久化消息,将消息发送给其它群聊客户端,以及其它业务逻辑
        console.log(context.req);
        next();
    },
    privateChatMessage:function(context,next){
        //可以在这里持久化消息,将消息发送给私聊客户端,以及其它业务逻辑
        console.log(context.req);
        next();
    }
}

使用:

easySocket.messageUse(messageRouteMiddleware(router))

跨进程事件

上一篇文章已经介绍了跨进程事件,这里直接说实现。

使用Node的原生事件模块

import compose from './compose';
const WebSocket = require('ws');
var EventEmitter = require('events').EventEmitter;
export default class EasySocket extends EventEmitter {
    constructor() {
        ...
        this.remoteEmitMiddleware = [];

        ...
        this.remoteEmitFn = Promise.resolve();
    }
    ...
    remoteEmitUse(fn, runtime) {
        this.remoteEmitMiddleware.push(fn);
        if (runtime) {
            this.remoteEmitFn = compose(this.remoteEmitMiddleware);
        }
        return this;
    }
    listen(config) {
        this.socket = new WebSocket.Server(config);
        ...
        this.remoteEmitFn = compose(this.remoteEmitMiddleware);

        ...
    }
    emit(event, args, isLocal = false) { 
        let arr = [event, args];
        if (isLocal) {
            super.emit.apply(this, arr);
            return this;
        }
        let evt = {
            event: event,
            args: args
        }
        let remoteEmitContext = { server: this, event: evt };
        this.remoteEmitFn(remoteEmitContext).catch(error => { console.log(error) })
        return this;
    }
}

最后

源码地址:easy-socket-node

基于easy-socket-nodeeasy-socket-browser一个完整例子:

index.html

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
</head>

<body>
</body>
<script src="https://unpkg.com/easy-socket-browser@1.1.1/lib/easy-socket.min.js"></script>
<script>
    <!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
</head>

<body>
</body>
<script src="https://unpkg.com/easy-socket-browser@1.1.1/lib/easy-socket.min.js"></script>
<script>
    var client = new EasySocket({
        name: 'demo',
        autoReconnect: true,
        pingMsg: '{"type":"event","event":"ping","args":"ping"}'//模拟emit 消息体
    });
    client.openUse((context, next) => {
        console.log("open");
        next();
    })
        .closeUse((context, next) => {
            console.log("close");
            next();
        }).errorUse((context, next) => {
            console.log("error", context.event);
            next();
        }).messageUse((context, next) => {
            if (context.res.type === 'event') {
                context.client.emit(context.res.event, context.res.args, true);
            }
            next();
        })
        .reconnectUse((context, next) => {
            console.log('正在进行重连')
            next();
        })
        .remoteEmitUse((context, next) => {
            let client = context.client;
            let event = context.event;
            if (client.socket.readyState !== 1) {
                console.log("连接已断开");
            } else {
                client.socket.send(JSON.stringify({
                    type: 'event',
                    event: event.event,
                    args: event.args
                }));
                next();
            }
        });


    client.connect('ws://localhost:3001');
    var msg = 1;
    setInterval(() => {
        client.emit('chatMessage', msg++)
    }, 3000);
    client.on("serverMessage", (data) => {
        console.log("serverMessage:" + data)
    });

</script>

</html>

</script>

</html>

server.js

var EasySocket = require('easy-socket-node').default;
var config = {
    port: 3001,
    perMessageDeflate: {
        zlibDeflateOptions: { // See zlib defaults.
            chunkSize: 1024,
            memLevel: 7,
            level: 3,
        },
        zlibInflateOptions: {
            chunkSize: 10 * 1024
        },
        // Other options settable:
        clientNoContextTakeover: true, // Defaults to negotiated value.
        serverNoContextTakeover: true, // Defaults to negotiated value.
        //clientMaxWindowBits: 10,       // Defaults to negotiated value.
        serverMaxWindowBits: 10,       // Defaults to negotiated value.
        // Below options specified as default values.
        concurrencyLimit: 10,          // Limits zlib concurrency for perf.
        threshold: 1024,               // Size (in bytes) below which messages
        // should not be compressed.
    }
}

var remoteEmitMiddleware = (context, next) => {
    var server = context.server;
    var event = context.event;
    for (let client of server.clients.values()) {
        client.readyState == 1 && client.send(makeEventMessage(event));
    }
}
function makeEventMessage(event) {
    return JSON.stringify({
        type: 'event',
        event: event.event,
        args: event.args
    })
}
var messageRouteMiddleware = (routes) => {
    return (context, next) => {
        if (context.req.type === 'event') {
            if (routes[context.req.event]) {
                routes[context.req.event](context, next);
            } else {
                context.server.emit(context.req.event, context.req.args);//将会直接触发remoteEmitMiddleware 中间件的调用
                next();
            }
        } else {
            next();
        }
    }
}
var router = {
    chatMessage: (context, next) => {
        var req = context.req;
        context.server.emit('serverMessage', req.args);
    }
}
var server = new EasySocket();
server
    .connectionUse((context, next) => {
        context.server.clients.set(1, context.client)
        console.log('new connection')
    })
    .closeUse((context, next) => {
        console.log('close')
    })
    .messageUse(messageRouteMiddleware(router))
    .remoteEmitUse(remoteEmitMiddleware)
    .listen(config)

console.log('Now start WebSocket server on port ' + config.port + '...')

运行过程,可以停止后端服务,然后再启动,测下心跳重连

实现的聊天室例子:online chat demo

聊天室前端源码:lazy-mock-im

聊天室服务端源码:lazy-mock

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容