一个简单的golang TCP通信例子

前言

新工作接手了公司的一个使用golang编写的agent程序,用于采集各个机器的性能指标和监控数据,之前使用http实现数据的上传,最近想把它改成tcp上传的方式,由于是新手上路,顺手写了一个小demo程序。

这个程序中包含:

  • 简单的TcpServer服务程序:侦听,数据收发与解析

  • 简单的客户端程序:数据收发与解析

服务器

与正常的其他语言一样,go中也提供了丰富的网络相关的包,按照正常的套路,它是这样的:

  1. 绑定端口,初始化套接字

  2. 启动侦听,开启后台线程接收客户端请求

  3. 接收请求,针对每个请求开启一个线程来处理通信

  4. 资源回收

golang的套路也是如此,不同的地方在于它可以使用goroutine来替换上面的线程;

整体的代码很简单,可以参考文档和api手册,示例代码如下:

package main

import (
    "fmt"
    "net"
    "os"
    "encoding/json"
    "bufio"
    "hash/crc32"
    "io"
)
//数据包的类型
const (
    HEART_BEAT_PACKET = 0x00
    REPORT_PACKET = 0x01
)

var (
    server = "127.0.0.1:8080"
)
//这里是包的结构体,其实是可以不需要的
type Packet struct {
    PacketType      byte
    PacketContent     []byte
}
//心跳包,这里用了json来序列化,也可以用github上的gogo/protobuf包
//具体见(https://github.com/gogo/protobuf)
type HeartPacket struct {
    Version     string`json:"version"`
    Timestamp   int64`json:"timestamp"`
}
//正式上传的数据包
type ReportPacket struct {
    Content   string`json:"content"`
    Rand         int`json:"rand"`
    Timestamp   int64`json:"timestamp"`
}
//与服务器相关的资源都放在这里面
type TcpServer struct {
    listener       *net.TCPListener
    hawkServer  *net.TCPAddr
}

func main() {
    //类似于初始化套接字,绑定端口
    hawkServer, err := net.ResolveTCPAddr("tcp", server)
    checkErr(err)
    //侦听
    listen, err := net.ListenTCP("tcp", hawkServer)
    checkErr(err)
    //记得关闭
    defer listen.Close()
    tcpServer := &TcpServer{
        listener:listen,
        hawkServer:hawkServer,
    }
    fmt.Println("start server successful......")
    //开始接收请求
    for {
        conn, err := tcpServer.listener.Accept()
        fmt.Println("accept tcp client %s",conn.RemoteAddr().String())
        checkErr(err)
        // 每次建立一个连接就放到单独的协程内做处理
        go Handle(conn)
    }
}
//处理函数,这是一个状态机
//根据数据包来做解析
//数据包的格式为|0xFF|0xFF|len(高)|len(低)|Data|CRC高16位|0xFF|0xFE
//其中len为data的长度,实际长度为len(高)*256+len(低)
//CRC为32位CRC,取了最高16位共2Bytes
//0xFF|0xFF和0xFF|0xFE类似于前导码
func Handle(conn net.Conn) {
    // close connection before exit
    defer conn.Close()
    // 状态机状态
    state := 0x00
    // 数据包长度
    length := uint16(0)
    // crc校验和
    crc16 := uint16(0)
    var recvBuffer []byte
    // 游标
    cursor := uint16(0)
    bufferReader := bufio.NewReader(conn)
    //状态机处理数据
    for {
        recvByte,err := bufferReader.ReadByte()
        if err != nil {
            //这里因为做了心跳,所以就没有加deadline时间,如果客户端断开连接
            //这里ReadByte方法返回一个io.EOF的错误,具体可考虑文档
            if err == io.EOF {
                fmt.Printf("client %s is close!\n",conn.RemoteAddr().String())
            }
            //在这里直接退出goroutine,关闭由defer操作完成
            return
        }
        //进入状态机,根据不同的状态来处理
        switch state {
        case 0x00:
            if recvByte == 0xFF {
                state = 0x01
                //初始化状态机
                recvBuffer = nil
                length = 0
                crc16 = 0
            }else{
                state = 0x00
            }
            break
        case 0x01:
            if recvByte == 0xFF {
                state = 0x02
            }else{
                state = 0x00
            }
            break
        case 0x02:
            length += uint16(recvByte) * 256
            state = 0x03
            break
        case 0x03:
            length += uint16(recvByte)
            // 一次申请缓存,初始化游标,准备读数据
            recvBuffer = make([]byte,length)
            cursor = 0
            state = 0x04
            break
        case 0x04:
            //不断地在这个状态下读数据,直到满足长度为止
            recvBuffer[cursor] = recvByte
            cursor++
            if(cursor == length){
                state = 0x05
            }
            break
        case 0x05:
            crc16 += uint16(recvByte) * 256
            state = 0x06
            break
        case 0x06:
            crc16 += uint16(recvByte)
            state = 0x07
            break
        case 0x07:
            if recvByte == 0xFF {
                state = 0x08
            }else{
                state = 0x00
            }
        case 0x08:
            if recvByte == 0xFE {
                //执行数据包校验
                if (crc32.ChecksumIEEE(recvBuffer) >> 16) & 0xFFFF == uint32(crc16) {
                    var packet Packet
                    //把拿到的数据反序列化出来
                    json.Unmarshal(recvBuffer,&packet)
                    //新开协程处理数据
                    go processRecvData(&packet,conn)
                }else{
                    fmt.Println("丢弃数据!")
                }
            }
            //状态机归位,接收下一个包
            state = 0x00
        }
    }
}

//在这里处理收到的包,就和一般的逻辑一样了,根据类型进行不同的处理,因人而异
//我这里处理了心跳和一个上报数据包
//服务器往客户端的数据包很简单地以\n换行结束了,偷了一个懒:),正常情况下也可根据自己的协议来封装好
//然后在客户端写一个状态来处理
func processRecvData(packet *Packet,conn net.Conn)  {
    switch packet.PacketType {
    case HEART_BEAT_PACKET:
        var beatPacket HeartPacket
        json.Unmarshal(packet.PacketContent,&beatPacket)
        fmt.Printf("recieve heat beat from [%s] ,data is [%v]\n",conn.RemoteAddr().String(),beatPacket)
        conn.Write([]byte("heartBeat\n"))
        return
    case REPORT_PACKET:
        var reportPacket ReportPacket
        json.Unmarshal(packet.PacketContent,&reportPacket)
        fmt.Printf("recieve report data from [%s] ,data is [%v]\n",conn.RemoteAddr().String(),reportPacket)
        conn.Write([]byte("Report data has recive\n"))
        return
    }
}
//处理错误,根据实际情况选择这样处理,还是在函数调之后不同的地方不同处理
func checkErr(err error) {
    if err != nil {
        fmt.Println(err)
        os.Exit(-1)
    }
}

<strong>特别需要注意:</strong>

Handle方法在一个死循环中使用了一个无阻塞的buff来读取套接字中的数据,因此当客户端主动关闭连接时,如果不对这个io.EOF进行处理,会导致这个goroutine空转,疯狂吃cpu,在这里io.EOF的处理非常重要:)

客户端

客户端与一般的TCP通信程序一样,它需要完成的工作有:

  1. 向服务器发送心跳包

  2. 向服务器发送数据包

  3. 接收服务器的数据包

需要注意的就是客户端与服务端的数据协议保持一致,请在开始发送数据之前启动数据接收

上面的3个工作我分别用了goroutine来做,整体的代码如下:

package main

import (
    "os"
    "fmt"
    "net"
    "time"
    "math/rand"
    "encoding/json"
    "bufio"
    "hash/crc32"
    "sync"
)
//数据包类型
const (
    HEART_BEAT_PACKET = 0x00
    REPORT_PACKET = 0x01
)
//默认的服务器地址
var (
    server = "127.0.0.1:9876"
)
//数据包
type Packet struct {
    PacketType      byte
    PacketContent   []byte
}
//心跳包
type HeartPacket struct {
    Version     string`json:"version"`
    Timestamp   int64`json:"timestamp"`
}
//数据包
type ReportPacket struct {
    Content   string`json:"content"`
    Rand         int`json:"rand"`
    Timestamp   int64`json:"timestamp"`
}

//客户端对象
type TcpClient struct {
    connection     *net.TCPConn
    hawkServer  *net.TCPAddr
    stopChan       chan struct{}
}

func main()  {
    //拿到服务器地址信息
    hawkServer,err := net.ResolveTCPAddr("tcp", server)
    if err != nil {
        fmt.Printf("hawk server [%s] resolve error: [%s]",server,err.Error())
        os.Exit(1)
    }
    //连接服务器
    connection,err := net.DialTCP("tcp",nil,hawkServer)
    if err != nil {
        fmt.Printf("connect to hawk server error: [%s]",err.Error())
        os.Exit(1)
    }
    client := &TcpClient{
        connection:connection,
        hawkServer:hawkServer,
        stopChan:make(chan struct{}),
    }
    //启动接收
    go client.receivePackets()
    
    //发送心跳的goroutine
    go func() {
        heartBeatTick := time.Tick(2 * time.Second)
        for{
            select {
            case <-heartBeatTick:
                client.sendHeartPacket()
            case <-client.stopChan:
                return
            }
        }
    }()
    
    //测试用的,开300个goroutine每秒发送一个包
    for i:=0;i<300;i++ {
        go func() {
            sendTimer := time.After(1 * time.Second)
            for{
                select {
                case <-sendTimer:
                    client.sendReportPacket()
                    sendTimer = time.After(1 * time.Second)
                case <-client.stopChan:
                    return
                }
            }
        }()
    }
    //等待退出
    <-client.stopChan
}

// 接收数据包
func (client *TcpClient)receivePackets()  {
    reader := bufio.NewReader(client.connection)
    for {
    //承接上面说的服务器端的偷懒,我这里读也只是以\n为界限来读区分包
        msg, err := reader.ReadString('\n')
        if err != nil {
            //在这里也请处理如果服务器关闭时的异常
            close(client.stopChan)
            break
        }
        fmt.Print(msg)
    }
}
//发送数据包
//仔细看代码其实这里做了两次json的序列化,有一次其实是不需要的
func (client *TcpClient)sendReportPacket()  {
    reportPacket := ReportPacket{
        Content:getRandString(),
        Timestamp:time.Now().Unix(),
        Rand:rand.Int(),
    }
    packetBytes,err := json.Marshal(reportPacket)
    if err!=nil{
        fmt.Println(err.Error())
    }
        //这一次其实可以不需要,在封包的地方把类型和数据传进去即可
    packet := Packet{
        PacketType:REPORT_PACKET,
        PacketContent:packetBytes,
    }
    sendBytes,err := json.Marshal(packet)
    if err!=nil{
        fmt.Println(err.Error())
    }
    //发送
    client.connection.Write(EnPackSendData(sendBytes))
    fmt.Println("Send metric data success!")
}

//使用的协议与服务器端保持一致
func EnPackSendData(sendBytes []byte) []byte {
    packetLength := len(sendBytes) + 8
    result := make([]byte,packetLength)
    result[0] = 0xFF
    result[1] = 0xFF
    result[2] = byte(uint16(len(sendBytes)) >> 8)
    result[3] = byte(uint16(len(sendBytes)) & 0xFF)
    copy(result[4:],sendBytes)
    sendCrc := crc32.ChecksumIEEE(sendBytes)
    result[packetLength-4] = byte(sendCrc >> 24)
    result[packetLength-3] = byte(sendCrc >> 16 & 0xFF)
    result[packetLength-2] = 0xFF
    result[packetLength-1] = 0xFE
    fmt.Println(result)
    return result
}
//发送心跳包,与发送数据包一样
func (client *TcpClient)sendHeartPacket() {
    heartPacket := HeartPacket{
        Version:"1.0",
        Timestamp:time.Now().Unix(),
    }
    packetBytes,err := json.Marshal(heartPacket)
    if err!=nil{
        fmt.Println(err.Error())
    }
    packet := Packet{
        PacketType:HEART_BEAT_PACKET,
        PacketContent:packetBytes,
    }
    sendBytes,err := json.Marshal(packet)
    if err!=nil{
        fmt.Println(err.Error())
    }
    client.connection.Write(EnPackSendData(sendBytes))
    fmt.Println("Send heartbeat data success!")
}
//拿一串随机字符
func getRandString()string  {
    length := rand.Intn(50)
    strBytes := make([]byte,length)
    for i:=0;i<length;i++ {
        strBytes[i] = byte(rand.Intn(26) + 97)
    }
    return string(strBytes)
}

后记

测试过程中,一共开了7个client,共计2100个goroutine,本机启动服务器端,机器配置为i-5/8G的情况下,整体的资源使用情况如下:

测试结果.png

需要改进的地方,也是后两篇的主题:

  • 引入内存池
  • 服务无缝重启
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,681评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,710评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,623评论 0 334
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,202评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,232评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,368评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,795评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,461评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,647评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,476评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,525评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,226评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,785评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,857评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,090评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,647评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,215评论 2 341

推荐阅读更多精彩内容