Streamsets实时数据同步

背景

在工作中,各产品线经常会有实时数据分析、跨中心查询的需求,而我们使用的Postgresql数据库难以实现跨库查询,所以,我们将Postgresql的数据实时同步到Snappydata中,来实现跨中心查询、实时数据分析的需求。

这里要说明的一点,运维Snappydata是非常痛苦的一件事,它的社区很不活跃,遇到问题,基本上都是要自己解决,并且它的语法并没有常用数据库那么丰富。

目前,我们正在考虑,将数据实时同步到Postgresql指定的库中,以schema加以区分数据库(数据来源)或产品线。

管道介绍

我们的实时同步主要是将Postgresql的数据同步到Snappydata中,而你也可以使用Streamsets实现从Mysql到Mysql的实时同步等等,Streamsets的组件是非常丰富,可以说足以满足常用的需求。这里,我先介绍第一版实时同步管道,但它会存在一些问题,这一版只能算是完成了实时数据同步的功能。
前提条件:已经安装了wal组件来采集Postgresql数据

实时数据同步

实时数据同步管道(第一版)

管道组件介绍:

1.Postgresql CDC Client

用途:连接数据库,采集Wal日志

参数配置:使用组件默认参数,填写必要的Replication Slot、JDBC连接、用户名/密码

需要指出的是,如果你想立刻看到数据变化,请将Max Batch Size设置为1(默认为100)

单批次最大采集数量

这里,我们发现Streamsets-3.9.1有以下Bug、问题,

1、高并发下,会存在数据丢失的问题,该问题我已经向Streamsets公司指出,目前他们正在完善中,估计会在3.16版本中修复;jira链接https://issues.streamsets.com/browse/SDC-13269#add-comment

2、我们测试3.9.1版本中该组件过滤表的功能,并没有起到过滤作用,因此,我们通过写python代码的方式实现过滤表的功能

3、该组件并不能采集DDL语句,只能采集insert、update、delete操作

4、设置项中Poll Interval 要小于Postgresql配置文件(postgres.conf) 的** wal_sender_timeout**

2. Expression Evaluator

用途:添加过滤表单

说明:可以不使用该组件,可以在③中写python脚本来过滤

3.Jython

用途:写python来解析wal日志,将wal日志处理成json形式

说明:

原始wal日志update/insert:

{
     "xid": 1055891831,
     "nextlsn": "130/6FC8A230",
     "timestamp": "2020-01-08 15:00:[14.243564+08]",
     "change": [{
         "kind": "update",
         "schema": "********",
         "table": "********",
         "columnnames": ["id", "create_user", "modify_user", "create_time", "modify_time", "app_id", "tenant_id", "deleted", "animal_id", "property_id", "property_val"],
         "columntypes": ["bigint", "character varying(500)", "character varying(500)", "timestamp(6) without time zone", "timestamp(6) without time zone", "bigint", "bigint", "boolean", "bigint", "bigint", "character varying(500)"],
         "columnvalues": [664478959718834178, "1182483438204788738", "1182483438204788738", "2020-01-08 14:42:[09.419]", "2020-01-08 15:00:[14.228968]", 502, 627092828622684160, false, 664478959546867712, 1158021187370930177, "2020-01-08 15:00:14"],
         "oldkeys": {
             "keynames": ["id"],
             "keytypes": ["bigint"],
             "keyvalues": [664478959718834178]
         }
     }, {
         "kind": "insert",
         "schema": "********",
         "table": "********",
         "columnnames": ["id", "create_user", "modify_user", "create_time", "modify_time", "app_id", "tenant_id", "deleted", "event_code", "event_date", "animal_id", "event_content", "org_id", "total_parity"],
         "columntypes": ["bigint", "character varying(500)", "character varying(500)", "timestamp(6) without time zone", "timestamp(6) without time zone", "bigint", "bigint", "boolean", "character varying(32)", "timestamp(6) without time zone", "bigint", "jsonb", "bigint", "integer"],
         "columnvalues": [664483509762727936, "1182483438204788738", "1182483438204788738", "2020-01-08 15:00:[14.234]", "2020-01-08 15:00:[14.234]", 503, 627092828622684160, false, "CR", "2020-01-08 15:00:14", 664478959546867712, "{\"reason\": \"4\", \"remark\": \"App登记\", \"isCancel\": 0}", 642422751306448896, null]
     }]
 }

原始wal日志delete:

{
    "xid": 1055894606,
    "nextlsn": "130/700E9880",
    "timestamp": "2020-01-08 15:18:53.051471+08",
    "change": [{
        "kind": "delete",
        "schema": "*********",
        "table": "*********",
        "oldkeys": {
            "keynames": ["id"],
            "keytypes": ["bigint"],
            "keyvalues": [631496870811664384]
        }
    }]
}

jython中python脚本代码:

for record in records:
  try:
    # 过滤表class和表student
    tables = ['class', 'student']
    if record.value['change']['table'] in tables:
      changes=record.value['change']
      for change in changes: 
        record.value.clear()
        kind=change['kind']
        if kind != 'delete':
          columnnames_list=change['columnnames']
          columnvalues_list=change['columnvalues']
        else:
          columnnames_list=change['oldkeys']['keynames']
          columnvalues_list=change['oldkeys']['keyvalues']            
      
        for idx in range(len(columnnames_list)):
          record.value[columnnames_list[idx]]=columnvalues_list[idx]
      
        record.value['kind']=kind
        record.value['table']=change['table']
        record.value['schema']=change['schema']
        record.value['database']='*********'
      
        output.write(record)
  except Exception as e:
    # Send record to error
    error.write(record, str(e))

处理后的数据:(这里,并没有截取原始wal日志的那条数据,只是来说明处理后的格式)

 新增、更新记录
{"id":645986949249966087,"create_user":"1191899847279063041","modify_user":"1191899847279063041","create_time":"2019-11-18 14:01:[30.558","modify_time":"2020-01-08 13:54:[55.789116]","app_id":502,"tenant_id":626804036418404352,"org_id":641585797362876416,"deleted":false,"stage":"nursery","start_date":"2019-10-01 14:00:37","end_date":"2020-01-08 13:54:14","qty":1,"wgt":[33.33,],"start_wgt_date":null,"end_wgt_date":null,"start_wgt":[0.0,],"end_wgt":[0.0,],"pre_id":0,"kind":"update","table":"********","schema":"********"}
 
 删除
 {"id":618383552928546816,"kind":"delete","table":"********","schema":"********"}

下面的截图是管道的数据快照,左边为原始的wal日志数据,右边为Jython组件处理后输出的数据。


管道数据快照

4.Stream Selector

用途:分流
说明:由于Streamsets的JDBC Producer只能进行insert/update/delete其中的一种操作,所以,你需要三个JDBC Producer来操作数据库


分流

5.JDBC Producer

用途:操作数据库
说明:只能进行一种操作


操作数据库

至此,你可以简单实现实时数据同步任务,如果,你不想自己配置,这里,我提供了管道的json文件,你直接导入到Streamsets,修改参数就可以使用了

百度云链接(还未上传)

这一版本存在以下问题:
每个管道占用一个postgresql slot,如果有其他管道使用该数据库的数据(如:实时宽表),则需要另外启动一个slot来采集数据,这样会降低postgresql的性能
所以,我将wal日志采集到kafka中,可以让多个管道进行消费

未完,后续更新

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,783评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,396评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,834评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,036评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,035评论 5 362
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,242评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,727评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,376评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,508评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,415评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,463评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,140评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,734评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,809评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,028评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,521评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,119评论 2 341

推荐阅读更多精彩内容