OGG实时同步数据到kafka配置

一、源端(oracle数据库端)---同这篇文章源端配置http://www.jianshu.com/p/53882229b70e

#Extract进程
#一定要记得同步之前要开启表的全补充日志
#alter table tb_name add supplemental log data (all) columns;
GGSCI (zwjfdb3) 7> view param EZWJFBOR

EXTRACT EZWJFBOR
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")
SETENV (ORACLE_SID = "zwjfdb3")
--捕获 truncate 操作
gettruncates
--定义discardfile文件位置,如果处理中有记录出错会写入到此文件中
DISCARDFILE ./dirrpt/ezwjfbor.dsc, APPEND, MEGABYTES 1024
--动态解析表名
DYNAMICRESOLUTION
--获取更新之前数据
GETUPDATEBEFORES
--当抽取进程遇到一个没有使用的字段时只生成一个警告,进程会继续执行而不会被异常终止(abend)
DBOPTIONS  ALLOWUNUSEDCOLUMN
--每隔30分钟报告一次从程序开始到现在的抽取进程或者复制进程的事物记录数,并汇报进程的统计信息
REPORTCOUNT EVERY 30 MINUTES, RATE
--每隔3分钟检查一下大事务,超过2小时还没结束的进行报告
WARNLONGTRANS 2h,CHECKINTERVAL 3m
--不会从闪回日志中获取数据
FETCHOPTIONS NOUSESNAPSHOT
USERID xxxxxx,PASSWORD xxxxxx
EXTTRAIL ./dirdat/zb
TABLE xx.xx;
TABLE xx.xx;

#添加抽取进程
GGSCI (zwjfdb3) 11> add extract EZWJFBOR,TRANLOG, BEGIN NOW
EXTRACT added.
#定义trail文件
GGSCI (zwjfdb3) 12> ADD EXTTRAIL ./dirdat/zb,EXTRACT EZWJFBOR, MEGABYTES 200
EXTTRAIL added.

#pump extract进程
GGSCI (zwjfdb3) 8> view param PZWJFBOR
EXTRACT PZWJFBOR
SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")
PASSTHRU
DYNAMICRESOLUTION
RMTHOST xx.xx.xx.xx,MGRPOT 7809 
RMTTRAIL ./dirdat/zb
TABLE xx.xx;
TABLE xx.xx;
#添加pump捕获组
GGSCI (zwjfdb3) 23> ADD EXTRACT PZWJFBOR,EXTTRAILSOURCE ./dirdat/zb
EXTRACT added.
#定义pump trail文件
GGSCI (zwjfdb3) 25> ADD RMTTRAIL ./dirdat/zb,EXTRACT PZWJFBOR, MEGABYTES 200
RMTTRAIL added.

#启动进程
GGSCI (zwjfdb3) 8> start EXTRACT EZWJFBOR
GGSCI (zwjfdb3) 8> start EXTRACT PZWJFBOR
GGSCI (zwjfdb3) 9> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
EXTRACT     RUNNING     EZWJFBOR    00:00:00      00:00:08    
EXTRACT     RUNNING     PZWJFBOR    00:00:00      00:00:16 

#传递表结构
GGSCI (zwjfdb3) 4> view param defgen
DEFSFILE dirdef/source.def, PURGE
USERID xxxxxx, PASSWORD xxxx
TABLE xx.xx;
TABLE xx.xx;
[oracle@zwjfdb3 12.2]$ ./defgen paramfile dirprm/defgen.prm 

***********************************************************************
        Oracle GoldenGate Table Definition Generator for Oracle
 Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258
   Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 16:58:29
 
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.


                    Starting at 2017-12-05 16:21:03
***********************************************************************

Operating System Version:
Linux
Version #1 SMP Thu Feb 23 03:04:39 UTC 2017, Release 3.10.0-514.6.2.el7.x86_64
Node: zwjfdb3
Machine: x86_64
                         soft limit   hard limit
Address Space Size   :    unlimited    unlimited
Heap Size            :    unlimited    unlimited
File Size            :    unlimited    unlimited
CPU Time             :    unlimited    unlimited

Process id: 375

***********************************************************************
**            Running with the following parameters                  **
***********************************************************************
把./dirdef/source.def文件拷贝到目标端的./dirdef目录下

二、目标端(kafka)

#java必须为1.8
[root@bigdata01 ~]$ java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
[root@bigdata01 ~]$ 
#创建ogg管理用户,并更改ogg安装路径权限
[root@bigdata01 ~]$ groupadd ogg
[root@bigdata01 ~]$ useradd -g ogg ogg
[root@bigdata01 ~]$ chown -R ogg:ogg  /opt/ogg
#上传并解压ggs_Adapters_Linux_x64.tar到相应目录
[ogg@bigdata01 ogg]$ pwd
/opt/ogg
[ogg@bigdata01 ogg]$ ll
total 582548
drwxr-xr-x 6 ogg ogg     4096 Jun  7  2016 AdapterExamples
-rw-r----- 1 ogg ogg      426 Oct 15  2010 bcpfmt.tpl
-rw-r----- 1 ogg ogg     1725 Oct 15  2010 bcrypt.txt
-rwxrwxr-x 1 ogg ogg  8557335 May  1  2016 cachefiledump
-rwxrwxr-x 1 ogg ogg  8730645 May  1  2016 checkprm
-rwxr-x--- 1 ogg ogg  9567306 May  1  2016 convchk
-rwxrwxr-x 1 ogg ogg 15019428 May  1  2016 convprm
-rw-r----- 1 ogg ogg      159 Oct 15  2010 db2cntl.tpl
drwxr-x--- 2 ogg ogg     4096 Dec  5 14:05 dirchk
drwxr-x--- 2 ogg ogg     4096 Dec  5 14:05 dircrd

#配置环境变量
[ogg@bigdata01 ~]$ cat ~/.bash_profile 
# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
    . ~/.bashrc
fi

# User specific environment and startup programs

export JAVA_HOME=/opt/java8
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=/opt/java8/jre/lib/amd64/libjsig.so:/opt/java8/jre/lib/amd64/server/libjvm.so:/opt/java8/jre/lib/amd64/server:/opt/java8/jre/lib/amd64
PATH=$PATH:$HOME/bin:$JAVA_HOME:$JAVA_HOME/bin:$OGG_HOME

export PATH
[ogg@bigdata01 ~]$ 

#登录并用命令创建ogg所需文件夹
[ogg@bigdata01 ogg]$ ./ggsci 

Oracle GoldenGate Command Interpreter
Version 12.2.0.1.160419 OGGCORE_12.2.0.1.0OGGBP_PLATFORMS_160430.1401
Linux, x64, 64bit (optimized), Generic on Apr 30 2016 16:21:34
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2016, Oracle and/or its affiliates. All rights reserved.



GGSCI (bigdata01) 1> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     STOPPED                                           


GGSCI (bigdata01) 2> create subdirs

Creating subdirectories under current directory /opt/ogg

Parameter files                /opt/ogg/dirprm: created
Report files                   /opt/ogg/dirrpt: created
Checkpoint files               /opt/ogg/dirchk: created
Process status files           /opt/ogg/dirpcs: created
SQL script files               /opt/ogg/dirsql: created
Database definitions files     /opt/ogg/dirdef: created
Extract data files             /opt/ogg/dirdat: created
Temporary files                /opt/ogg/dirtmp: created
Credential store files         /opt/ogg/dircrd: created
Masterkey wallet files         /opt/ogg/dirwlt: created
Dump files                     /opt/ogg/dirdmp: created

#配置mgr
GGSCI (bigdata01) 3> view param MGR
PORT 7809
DYNAMICPORTLIST 7810-7909
--定期清理dirdat路径下的本地队列,保留期限10天,过期后自动删除。
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

#开启mgr
GGSCI (bigdata01) 4> start mgr
Manager started.


GGSCI (bigdata01) 5> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING  

#配置kafka
[ogg@bigdata01 big-data]$ cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/
[ogg@bigdata01 big-data]$ cd $OGG_HOME/dirprm/
[ogg@bigdata01 big-data]$ vi kafka.props 
[ogg@bigdata01 big-data]$ cat kafka.props 
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.TopicName =ogg_zwjfborrower
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode =op
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb


goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/data/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar 

#配置custom_kafka_producer.properties
[ogg@bigdata01 dirprm]$ cat custom_kafka_producer.properties 
bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=102400
linger.ms=10000

#定义replication
GGSCI (bigdata01) 2> view param RZWJFBOR
REPLICAT RZWJFBOR
Setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
SOURCEDEFS dirdef/source.def
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP ZWJFBORROWER.*, TARGET ZWJFBORROWER.*;

#指定Trail文件
GGSCI (bigdata01) 2> add replicat RZWJFBOR, exttrail ./dirdat/zb
#启动replicat进程
GGSCI (bigdata01) 2>start replicat RZWJFBOR
GGSCI (bigdata01) 3> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
REPLICAT    RUNNING     RZWJFBOR    00:00:00      00:00:08    

三、测试数据是否到kafka

[root@bigdata01 ~]# /data/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/bin/kafka-console-consumer  --zookeeper bigdata01:2181 --topic ogg_zwjfborrower --from-beginning 
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容