基于BottledWater-PG+nodejs实时地图应用实践

前文作者讲述了BottledWater-PG安装部署,并在pg中实现了数据改变,向kafka发送消息的案例,详细参考《BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台》。此前作者写过一篇pg的异步消息实现的实时地图应用案例《postgres+socket.io+nodejs实时地图应用实践》,本文将改用BottledWater-PG实现一遍。

一 服务器端

var fs = require('fs');
var http = require('http');
var socket = require('socket.io');
var Kafka = require('node-rdkafka');
    
var server = http.createServer(function(req, res) {
    
    res.writeHead(200, { 'Content-type': 'text/html'});
    res.end(fs.readFileSync(__dirname + '/index.html'));
    }).listen(8081, function() {
        console.log('Listening at: http://localhost:8081');
});
//注册socket.io
var socketio=socket.listen(server);
socketio.on('connection', function (socketclient) {
    console.log('已连接socket:');
    //socketclient.broadcast.emit('GPSCoor', data.payload);//广播给别人
    //socketclient.emit('GPSCoor', data.payload);//广播给自己
    
});

var consumer = new Kafka.KafkaConsumer({
  //'debug': 'all',
  'metadata.broker.list': '192.168.43.27:9092',
  'group.id': 'node-rdkafka-consumer-flow-example',
  'enable.auto.commit': false
});

var topicName = 'gps';

//logging debug messages, if debug is enabled 
consumer.on('event.log', function(log) {
  console.log(log);
});

//打印错误
consumer.on('error', function(err) {
  console.error('Error from consumer');
  console.error(err);
});

consumer.on('ready', function(arg) {
  console.log('consumer ready.' + JSON.stringify(arg));
  consumer.subscribe([topicName]);
  //准备消费消息
  consumer.consume();
});

consumer.on('data', function(m) {
    console.log(m);
    let _data;
    if(m.value==null)//delete操作发送来的消息
    {
        _data=JSON.parse(m.key);
        _data.tg_op='delete';
    }
    else{
        _data=m.value.toString();
        _data=JSON.parse(_data);
    }   
    console.log(_data);
    socketio.emit('GPSCoor', _data);//广播给所有的客户端
});

consumer.on('disconnected', function(arg) {
  console.log('consumer disconnected. ' + JSON.stringify(arg));
});

//启动
consumer.connect();

二 客户端

<html>
<head>
    <meta charset='utf-8'>
    <title>实时地图应用</title>
    <link rel="stylesheet" href="http://openlayers.org/en/v3.18.2/css/ol.css" type="text/css">
    <script src="http://openlayers.org/en/v3.18.2/build/ol.js"></script>
    <script src="/socket.io/socket.io.js"></script>
    <script>
            var wktform=new ol.format.WKT();//wkt解析
            var gpsSource=new ol.source.Vector();
            function init(){
                var gpsLayer=new ol.layer.Vector({
                    source:gpsSource,
                    style:new ol.style.Style({
                        image: new ol.style.Icon(({
                            anchor: [0.5, 1],
                            src: 'http://openlayers.org/en/v3.18.2/examples/data/icon.png'
                        }))
                    })
                });
                var map = new ol.Map({
                    layers : [
                        new ol.layer.Tile({
                            title : '街道图',
                            visible : true,
                            source : new ol.source.XYZ({
                                url : 'http://www.google.cn/maps/vt?pb=!1m5!1m4!1i{z}!2i{x}!3i{y}!4i256!2m3!1e0!2sm!3i342009817!3m9!2szh-CN!3sCN!5e18!12m1!1e47!12m3!1e37!2m1!1ssmartmaps!4e0&token=32965'
                            })
                        }),
                        gpsLayer
                    ],
                    target : 'map',
                    controls : ol.control.defaults({
                        attributionOptions : 
                        ({
                            collapsible : false
                        })
                    }),
                    view : new ol.View({
                        center : [0, 0],
                        zoom : 2
                    })
                });
                var iosocket = io.connect();
                //接受服务端消息
                iosocket.on('GPSCoor', function(data) {
                    console.log(data);
                    var id=data.id.int;
                    var feature;
                    if(data.tg_op=='delete'){
                        feature=gpsSource.getFeatureById(id);
                        if(feature)
                            gpsSource.removeFeature(feature);//删除点
                    }
                    else{
                        var geom=data.geom.string;
                        geom=wktform.readGeometry(geom);
                        geom.transform('EPSG:4326','EPSG:3857');
                        feature=gpsSource.getFeatureById(id);
                        if(feature)
                            feature.setGeometry(geom);//修改已有点
                        else{
                            feature=new ol.Feature({
                                geometry:geom
                            });
                            feature.setId(id);
                            gpsSource.addFeature(feature);//地图新增点
                        }
                    }
                });
            }
            
           
    </script>
</head>
<body onload="init()">
    <div id="map"></div>
</body>
</html>

三 测试成果

3.1 新增

mcsas=# insert into gps(name,geom) values ('opy','Point(118 31.5)');
INSERT 0 1
mcsas=# insert into gps(name,geom) values ('ty','Point(117 30.5)');
INSERT 0 1
新增成果.png

3.2 修改

mcsas=# update gps set geom='Point(115 40)' where name='opy';
UPDATE 1
修改成果.png

3.3 删除

mcsas=# delete from gps where name='opy';
DELETE 1

删除结果.png

四 总结

BottledWater-PG主要作用是将pg库中的表的增删改的消息都发往了kafka,应用程序并没有直接连接数据库,而是直接去消费kafka的消息。在表发生insert,update,delete能获取消息,但是truncate table并未向kafka生成消息,不知是否是我哪里遗漏。
  作者之前曾使用pg自带的notify与listen实现异步消息发送,该方法借助了表的触发器实现。应用程序是直连数据库且数据增删改都会走触发器。
  匆忙中,作者并未对比两者之间孰优孰劣,但一个直连库,一个间接消费,在不同需求中可选择一个比较符合要求的方案而加以应用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,563评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,263评论 25 707
  • 发现 关注 消息 iOS 第三方库、插件、知名博客总结 作者大灰狼的小绵羊哥哥关注 2017.06.26 09:4...
    肇东周阅读 11,982评论 4 60
  • 你挂念我的念头 澎湃着我的灵魂 本来我正在一滴露水里打坐 只好腾挪到一片积雨云之上 这是我们意识交媾的软床 你又想...
    楊孜阅读 423评论 0 3
  • 殇感记语阅读 212评论 0 2