1. 前言
libev是一个非常轻量级的事件驱动框架,用来实现一个简单的消息处理服务会非常简单。
2. 软件框架
- 启动一个线程从MQTT Broker接收字符串形式的命令,接收完成后保存到队列,并唤醒主线程从队列处理命令
- 启动一个TCP Server,接收从手机、PC、等终端发送过来的命令,并把接收的命令保存到队列,并唤醒主线程从队列处命令
- 主线程大部分时间处于sleep状态,当消息队列有命令时被唤醒
这种简单的框架在嵌入式中经常能用的上,代码比较简单。
3. 代码路径
https://github.com/jokerC8/msgHandler.git
4. 效果
- 安装依赖
$$ sudo apt install libev-dev
$$ sudo apt install mosquitto-dev
- 安装工具
sudo apt install mosquitto-clients
- 客户端
启动100万个goroutine去连接并发送json消息
package main
import (
"fmt"
"net"
"sync"
"time"
)
var (
count = 1000000
)
func client(addr string) {
wg := sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
conn, err := net.Dial("tcp", addr)
if err != nil {
fmt.Println(err)
return
}
go func(c net.Conn) {
defer func() {
c.Close()
wg.Done()
}()
messages := []string{
`{"open":"door"}`,
`{"close":"light"}`,
`{"change":"color"}`,
`{"report":"power"}`,
`{"move":"down"}`,
`{"move":"up"}`,
`{"move":"left"}`,
`{"move":"right"}`,
}
message := messages[time.Now().UTC().Second() % len(messages)]
length := len(message)
data := make([]byte, 4 + length)
data[0] = 0x0a
data[1] = 0x05
data[2] = uint8(length>>8);
data[3] = uint8(length)
copy(data[4:], []byte(message))
c.Write(data)
} (conn)
}
wg.Wait()
}
func main() {
client("127.0.0.1:10080")
}
- 发送订阅消息
#!/bin/bash
function client()
{
while true; do
mosquitto_pub -h 127.0.0.1 -p 1883 -t "/parent/child/topic" -m "{\"hello\":\"world\"}"
sleep 0.1
done
}
client