用Python操作nanomsg(二)——PipeLine

PipeLine基本用法

运行效果

用Python操作nanomsg(一)——准备之后,本文着重介绍nanomsg的PipeLine通信模式。

在PipeLine模式中,Socket Type为NN_PUSH的可以发送,Socket Type为NN_PULL的可以接收:

建立PUSH节点发送数据(Establish PUSH node to send data)

import nnpy
node0 = nnpy.Socket(nnpy.AF_SP, nnpy.PUSH)
node0.bind('tcp://*:4000')
# 发送数据
node0.send(bytes('你好', encoding='utf-8'))

建立PULL节点接收数据(Establish PULL node to receive data)

import nnpy
node1 = nnpy.Socket(nnpy.AF_SP, nnpy.PULL)
node1.connect('tcp://127.0.0.1:4000')
# 接收数据
recv_data = node1.recv()
data = recv_data.decode('utf-8')

注意的是发送和接收的数据都是二进制,发送前和接收后分别要进行编码和解码,这里使用utf-8作为统一格式。
没错就是这么简单,直接开始项目练手。

PipeLine练习项目

单向管道局域网聊天程序(PipeLine LAN-chat program)

这个LAN-chat program是一个命令行程序,通过子命令在PUSH或PULL模式下运行。

编写cli入口

首先明确我们的子命令和其参数,先简单一些,实现发送文字消息功能需要明确的内容为如下三部分:

子命令(sub-commands)

mode sub-command command description
PUSH bind chat.py bind [protocol] [addr] run by server
PULL connect chat.py connect [--keep-alive] [protocol] [addr] run by client

主要由两个子命令bindconnect组成,分别启动push server和pull client两个不同角色。

子命令参数(arguments of sub-commands)

argument default value description
protocol tcp inproc/tcp/udp/ws
addr *:4000 for PUSH and 127.0.01:4000 for PULL
--keep-alive False (未完善)

参数默认值定义在config.py中,默认protocol=tcpaddr=*:4000(PULL模式为127.0.0.1:4000)。

标志(flags)

flag variable name default value send by react by action
FLAG_CLIENT_OFFLINE client-offline-now PUSH PULL PULL节点结束进程
FLAG_SERVER_EXIT server-exit-now PUSH PUSH PUSH节点结束进程

以上flag作用于其被作为消息发送时可触发对应的动作。

关于包含子命令可参考前期文章:给Python脚本带上子命令(sub-commands),基于此可得到cli入口文件如下:

# _*_coding:utf-8 _*_
# @Time    : 2020/2/5 18:41
# @Author  : Shek 
# @FileName: OneWayPipe_CLI.py
# @Software: PyCharm
import argparse
from module.func import *
import config

parser = argparse.ArgumentParser(description=config.PROGRAM_DESCRIPTION)
subparsers = parser.add_subparsers()

# command 'bind'
cmd_bind = subparsers.add_parser('bind', help=config.H_BIND)
cmd_bind.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_BIND_PROTOCOL)
cmd_bind.add_argument('addr', action='store', nargs='?', default=config.BIND_ADDR, help=config.H_BIND_ADDR)
cmd_bind.set_defaults(func=sub_cmd_bind)

# command 'connect'
cmd_connect = subparsers.add_parser('connect', help=config.H_CONNECT)
cmd_connect.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_CONNECT_PROTOCOL)
cmd_connect.add_argument('addr', action='store', nargs='?', default=config.CONNECT_ADDR, help=config.H_CONNECT_ADDR)
cmd_connect.add_argument('--keep-alive', action='store_true', help=config.H_CONNECT_KEEP_ALIVE)
cmd_connect.set_defaults(func=sub_cmd_connect)

args = parser.parse_args()  # 处理输入的参数
if not hasattr(args, 'func'):
    # 无参数时跳转到-h
    #否则会提示 namespace object has not attribute 'func',故这里用hasattr()判断
    args = parser.parse_args(['-h'])
args.func(args)  # 跳转到对应的函数

sub_cmd_bind:服务端消息处理函数

1.创建日志写入对象,这是我整合的一个自用类,日志可同时写入到本地文件和输出到终端,用来替代printf():

# 1 initialize a logger
log = logger.Logger(config.LOG_NAME_PUSH)

2.创建nnpy.Socket对象,装入nnpy.PUSH到其参数protocol中,绑定本地地址tcp://*:4000:

# 2 create object
push_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUSH)
# 3 establish a sever to push message
log.info('binding to {}://{} ...'.format(arguments.protocol, arguments.addr))
result = push_server.bind('{}://{}'.format(arguments.protocol, arguments.addr))
# bind status
log.info('success') if result else log.info('failed') and exit(0)

3.建立消息发送循环:

# 4 push loop
time.sleep(0.5)
while True:
    content = input('Send>')
    # send message/data/command
    send_result = push_server.send(bytes(content, encoding=config.DATA_ENCODING))
# 5 close server
push_server.close()

同Socket一样,网络中传输过程中统一走的是二进制,故发送和接收需要进行编码和解码:

str转二进制:bin = bytes(content, encoding='utf-8')

二进制转str:content = bin.decode('utf-8')

但是这个循环是有问题的:这个While跳不出来,所以我们需要设置一个标志信息,当发送这个标志信息的时候退出服务端的消息循环,这是前面设置flags的目的。硬件设计中flag通常设置为整数,如0x77,这里方便起见直接使用字符串即可。

4.加入对flag的判断:

content = input('Send({})>'.format(config.COUNT_SEND_SUCCESS))
# process input message/command
if content == config.FLAG_SERVER_EXIT:
    # exit command caught, break loop
    log.info(config.L_SERVER_EXIT)
    break
else:
    # send message/data/command
    ...

5.消息发送次数统计:

# send message/data/command
send_result = push_server.send(bytes(content, encoding=config.DATA_ENCODING))
if send_result:  # success
    config.COUNT_SEND_SUCCESS += 1
else:  # failed (warning: in push / pull mode will never reach here)
    config.COUNT_SEND_FAILED += 1
    log.warning('{}:{}'.format(config.L_SERVER_SEND_FAILED_PREFIX, content))

发送次数统计仅为美化,另外这里发送后成功的返回值是什么,所以这里的发送成功统计暂时没有意义,约等于发送次数统计。

6.完成:


sub_cmd_bind函数完整代码

sub_cmd_connect:编写客户端消息处理函数

1.同样先创建日志写入对象:

# 1 initialize a logger
log = logger.Logger(config.LOG_NAME_PULL)

2.创建nnpy.Socket对象,装入nnpy.PULL到其参数protocol中,连接地址tcp://127.0.0.1:4000(这里keep_alive参数暂时没有利用起来):

# 2 create object
pull_client = nnpy.Socket(nnpy.AF_SP, nnpy.PULL)
# not completed yet
if arguments.keep_alive:
    print(config.I_KEEP_ALIVE_ENABLED)
# 3 connect to a server for receiving message
log.info('connecting to {}://{}'.format(arguments.protocol, arguments.addr))
result = pull_client.connect('{}://{}'.format(arguments.protocol, arguments.addr))
# connect status
log.info(config.I_OP_SUCCESS) if result else log.info(config.I_OP_FAILED) and exit(0)

4.创建消息接受循环,根据服务端的经验,同样加入跳出标志,一个是客户端本机按Ctrl + C时利用触发的KeyboardInterrupt异常跳出循环,另一个是接收服务端发送的FLAG_CLIENT_OFFLINE标志信息自行下线:

# 4 receive in loop
time.sleep(0.5)
while True:
    try:
        recv_data = pull_client.recv()
        if recv_data:
            decoded_data = recv_data.decode(config.DATA_ENCODING)
            # 3 process received data
            if decoded_data == config.FLAG_CLIENT_OFFLINE:
                # receive a go-offline flag from server, break loop
                log.info(config.L_CLIENT_FLAG_OFFLINE_DETECTED)
                break
            # display message push by server
            print('{} {}'.format(current_datetime(), decoded_data))
            # logging to text file
            log.debug(decoded_data)
    except KeyboardInterrupt:
        # ctrl + c detected
        log.info(config.L_CLIENT_CTRL_C)
        break

# 5 close client
pull_client.close()
log.info(config.L_CLIENT_CLOSED)

5.完成:

sub_cmd_connect函数完整代码

测试运行

主要工作已完成,可能有眼细的同学看到上述代码中有很多config.开头的变量,这是为了方便日后维护,故把配置信息单独放在config.py中。


flag都定义在config.py中
运行效果:v0.0.5 测试版本1, 0ac4bf57 on 2020/2/6 at 22:55

总结

上述虽然实现了基本的通信功能,但是很明显PipeLine模式不能满足“局域网聊天”的这个需要,首先它不支持双向通信,也不支持异步收发。在后续的模式测试中我们再对其进行改进。


本系列其他文章:

内容 文章地址 说明
准备 用Python操作nanomsg(一)——准备 2020.2.7更新
PushPub 用Python操作nanomsg(三)——PubSub 2020.2.8更新
Pair 用Python操作nanomsg(四)——Pair 未开始
ReqRep 用Python操作nanomsg(五)——ReqRep 未开始
Survey 用Python操作nanomsg(六)——Survey 未开始
Bus 用Python操作nanomsg(七)——Bus 未开始
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,612评论 5 471
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,345评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,625评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,022评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,974评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,227评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,688评论 3 392
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,358评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,490评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,402评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,446评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,126评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,721评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,802评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,013评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,504评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,080评论 2 341