go-tcpsvr

go语言用于制作socket相关的事情是非常方便的。在底层框架直接有支持。在这里编写一个简单版本的服务器通讯逻辑。参考了leaf的代码。

package main

import (
    "encoding/binary"
    "fmt"
    "io"
    "net"
    "os"
    "os/signal"
    "sync"
    "time"
)

type (
    // 会话对象
    session struct {
        sync.Mutex
        conn      net.Conn
        writeChan chan []byte
        closeFlag bool
        owner     *server
    }

    SessionSet map[*session]struct{}

    // 服务器对象
    server struct {
        sync.Mutex
        tempDelay  time.Duration
        ln         net.Listener
        wgLn       sync.WaitGroup
        mutexConns sync.Mutex
        conns      SessionSet
        wgSession  sync.WaitGroup
    }
)

func (this *server) Close() {
    fmt.Printf("server.Close() address: %v\n", this.ln.Addr().String())
    this.ln.Close()
    this.wgLn.Wait()

    this.mutexConns.Lock()
    for conn := range this.conns {
        conn.Close()
    }
    this.conns = nil
    this.mutexConns.Unlock()
    this.wgSession.Wait()
    fmt.Printf("server.Close() done\n", this.ln.Addr().String())
}

func (this *server) ProcTempErr() {
    tempDelay := this.tempDelay
    if tempDelay == 0 {
        tempDelay = 5 * time.Millisecond
    } else {
        tempDelay *= 2
    }
    if max := 1 * time.Second; tempDelay > max {
        tempDelay = max
    }
    fmt.Printf("server.ProcTempErr accept retrying in %v\n", tempDelay)
    time.Sleep(tempDelay)
    this.tempDelay = tempDelay

}

func (this *server) onNewSession(conn net.Conn) {
    fmt.Printf("server.onNewSession remote %v\n", conn.RemoteAddr().String())

    c := &session{conn: conn, closeFlag: false, owner: this}
    c.writeChan = make(chan []byte, 512*1024)

    this.mutexConns.Lock()
    this.conns[c] = struct{}{}
    this.mutexConns.Unlock()

    go c.goRecvData(this.wgSession)
    go c.goSendData(this.wgSession)
    fmt.Println("server.onNewSession done")

    c.PostData(`I had seen little of Holmes lately.
My marriage had drifted us away from each other.`)
}

func (this *server) onSessionLost(conn *session) {
    fmt.Println("server.onSessionLost")
    this.mutexConns.Lock()
    if _, ok := this.conns[conn]; ok {
        delete(this.conns, conn)
        fmt.Println("server.onSessionLost done")
    }
    this.mutexConns.Unlock()
}

func (this *server) runAccept() {
    this.wgLn.Add(1)
    defer this.wgLn.Done()
    for {
        conn, err := this.ln.Accept()
        if err != nil {
            if ne, ok := err.(net.Error); ok && !ne.Temporary() {
                fmt.Printf("Accept err: %v\n", ne.Error())
                return
            }
            this.ProcTempErr()
            continue
        }
        this.onNewSession(conn)
    }
    fmt.Println("server.runAccept done")
}

func (this *server) Listen() {
    ln, err := net.Listen("tcp4", "0.0.0.0:8089")
    if err != nil {
        fmt.Println("listen fail: ", err)
        return
    }
    this.ln = ln
    this.conns = make(SessionSet)
    go this.runAccept()
    fmt.Printf("server.Listen address: %v done\n", ln.Addr().String())
}

func (this *session) PostData(data string) {
    msgLen := len(data)
    buf := []byte(data)
    msg := make([]byte, 2+msgLen)
    binary.LittleEndian.PutUint16(msg, uint16(msgLen))
    copy(msg[2:], buf)

    this.Lock()
    if !this.closeFlag {
        this.writeChan <- msg
    }
    this.Unlock()
    fmt.Printf("session.PostData address: %v, data size: %v\n",
        this.conn.RemoteAddr().String(), len(msg))
}

func (this *session) Close() {
    fmt.Println("session.Close")
    this.innorCloseSock()
}

func (this *session) innorCloseSock() {
    this.Lock()
    if !this.closeFlag {
        this.closeFlag = true
        this.conn.Close()
        close(this.writeChan)
        fmt.Println("session innorCloseSock done")
    }
    this.Unlock()
}

func (this *session) goRecvData(wg sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()
    conn := this.conn
    for {
        data, err := procData(conn)
        if err != nil {
            fmt.Printf("session.goRecvData procData fail, err: %v\n", err)
            break
        }
        this.handleData(data)
    }
    this.innorCloseSock()
}

func (this *session) goSendData(wg sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()
    conn := this.conn

    for b := range this.writeChan {
        if b == nil {
            fmt.Println("session.goSendData b is nil")
            break
        }
        _, err := conn.Write(b)
        if err != nil {
            fmt.Printf("session.goSendData Write fail, err: %v\n", err)
            break
        }
    }
    this.owner.onSessionLost(this)
    this.innorCloseSock()
}

func procData(conn net.Conn) ([]byte, error) {
    var bufHeader []byte = make([]byte, 2, 2)
    if _, err := io.ReadFull(conn, bufHeader); err != nil {
        fmt.Printf("procData io.ReadFull header fail, err: %v\n", err.Error())
        return nil, err
    }
    var msgLen uint32
    msgLen = uint32(binary.LittleEndian.Uint16(bufHeader))
    fmt.Printf("procData header get size: %v\n", msgLen)
    msgData := make([]byte, msgLen)
    if _, err := io.ReadFull(conn, msgData); err != nil {
        fmt.Printf("procData io.ReadFull body fail, err: %v\n", err.Error())
        return nil, err
    }
    fmt.Printf("procData get data, size: %v\n", len(msgData))
    return msgData, nil
}

func (this *session) handleData(data []byte) {
    fmt.Printf("session.handleData remote address: %v, recv data size: %v\n",
        this.conn.RemoteAddr().String(), len(data))
    content := string(data)
    fmt.Println(content)
}

func main() {
    svr := &server{}
    svr.Listen()

    // close
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, os.Kill)
    <-c

    svr.Close()
}

测试代码:

#! /usr/bin/python3
# coding: utf-8
import asyncore, socket,struct
import time
import logging

logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S')

msgstr = '''One night---it was on the twentieth of March,1888---I was returning
from a journey to a patient.
'''

class Client( asyncore.dispatcher ):

    def __init__( self, _host, _port ):
        asyncore.dispatcher.__init__( self )
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect( ( _host, _port ) )
        self.recv_buf_ = b''
        self.send_buf_ = b''
        self.post_data(msgstr)

    def post_data(self, _raw_buf):
        #
        # | short      | binary            |
        # | pack size  | encrypt binary    |
        # | binary data|                   |
        # small-endian unsigned int
        # 
        self.send_buf_ = self.send_buf_ + struct.pack("<h",len(_raw_buf))
        # can't concat str to bytes
        self.send_buf_ = self.send_buf_ + _raw_buf.encode()

    def handle_connect( self ):
        logging.debug("on handle_connect establishment")

    def handle_close( self ):
        self.close()
        logging.error("on handle_close socket cannt connect")

    def handle_read( self ):
        self.recv_buf_ += self.recv( 8192 )
        logging.debug('on handle_read read size: {0}'.format(len(self.recv_buf_)))
        #
        # | short      | binary            |
        # | pack size  | encrypt binary    |
        # | binary data|                   |
        # small-endian unsigned int
        # 
        while len(self.recv_buf_) > 2:
            if len(self.recv_buf_) < 2:
                return True
            stream_size = struct.unpack( "<h", self.recv_buf_[:2])[0]
            logging.debug("handle_read stream_size: {0}".format(stream_size))
            self.proc_data(self.recv_buf_[2:2+stream_size])
            self.recv_buf_ = self.recv_buf_[2+stream_size:]

    def writeable( self ):
        return ( len( self.send_buf_ ) > 0 )

    def handle_write( self ):
        if not self.writeable():
            time.sleep(1)
            return
        sent = self.send( self.send_buf_ )
        logging.info("handle_write, sent: %d"%sent)
        self.send_buf_ =  self.send_buf_[ sent: ]
        pass
    
    def proc_data(self,content):
        logging.debug("proc_data content: {}".format(content))

if __name__ == "__main__":
    logging.info("robot is launch")
    clients = []
    for i in range( 0, 100 ):
        clients.append( Client( '127.0.0.1', 8089) )
    asyncore.loop()
    pass

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容

  • 1. 分布式系统核心问题 参考书籍:《区块链原理、设计与应用》 一致性问题例子:两个不同的电影院买同一种电影票,如...
    molscar阅读 904评论 0 0
  • 我们奢望奇迹,是因为我们往往误以为只有我们才有可能获得奇迹。若是你能够享受奇迹带给你的好,那么你也必须承受奇迹发生...
    AshTsang阅读 120评论 0 0
  • 云泄露:简单的云服务器配置操作,蕴含巨大商业风险云泄漏是指存储在云服务器中的敏感数据毫无防备的暴露在互联网上。云泄...
    溪边的墓志铭阅读 146评论 0 0
  • 雪(其一) 文/柏红忠 绽放傲寒冬,最喜伴青松。 盈手玉飞燕,风飘白鹅绒。 雪(其二) 文/柏红忠 寒冬花玉容,乘...
    柏红忠阅读 1,335评论 24 71
  • 三亚很多人想去旅游的地方,他们选择三亚的原因各有不同。这篇文章中描写了三亚的三个地方,每个地方都有自己的特色。虽然...
    7738d4790f81阅读 189评论 0 0