一、源端(oracle数据库端)---同这篇文章源端配置http://www.jianshu.com/p/53882229b70e
#Extract进程
#一定要记得同步之前要开启表的全补充日志
#alter table tb_name add supplemental log data (all) columns;
GGSCI (zwjfdb3) 7> view param EZWJFBOR
EXTRACT EZWJFBOR
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")
SETENV (ORACLE_SID = "zwjfdb3")
--捕获 truncate 操作
gettruncates
--定义discardfile文件位置,如果处理中有记录出错会写入到此文件中
DISCARDFILE ./dirrpt/ezwjfbor.dsc, APPEND, MEGABYTES 1024
--动态解析表名
DYNAMICRESOLUTION
--获取更新之前数据
GETUPDATEBEFORES
--当抽取进程遇到一个没有使用的字段时只生成一个警告,进程会继续执行而不会被异常终止(abend)
DBOPTIONS ALLOWUNUSEDCOLUMN
--每隔30分钟报告一次从程序开始到现在的抽取进程或者复制进程的事物记录数,并汇报进程的统计信息
REPORTCOUNT EVERY 30 MINUTES, RATE
--每隔3分钟检查一下大事务,超过2小时还没结束的进行报告
WARNLONGTRANS 2h,CHECKINTERVAL 3m
--不会从闪回日志中获取数据
FETCHOPTIONS NOUSESNAPSHOT
USERID xxxxxx,PASSWORD xxxxxx
EXTTRAIL ./dirdat/zb
TABLE xx.xx;
TABLE xx.xx;
#添加抽取进程
GGSCI (zwjfdb3) 11> add extract EZWJFBOR,TRANLOG, BEGIN NOW
EXTRACT added.
#定义trail文件
GGSCI (zwjfdb3) 12> ADD EXTTRAIL ./dirdat/zb,EXTRACT EZWJFBOR, MEGABYTES 200
EXTTRAIL added.
#pump extract进程
GGSCI (zwjfdb3) 8> view param PZWJFBOR
EXTRACT PZWJFBOR
SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")
PASSTHRU
DYNAMICRESOLUTION
RMTHOST xx.xx.xx.xx,MGRPOT 7809
RMTTRAIL ./dirdat/zb
TABLE xx.xx;
TABLE xx.xx;
#添加pump捕获组
GGSCI (zwjfdb3) 23> ADD EXTRACT PZWJFBOR,EXTTRAILSOURCE ./dirdat/zb
EXTRACT added.
#定义pump trail文件
GGSCI (zwjfdb3) 25> ADD RMTTRAIL ./dirdat/zb,EXTRACT PZWJFBOR, MEGABYTES 200
RMTTRAIL added.
#启动进程
GGSCI (zwjfdb3) 8> start EXTRACT EZWJFBOR
GGSCI (zwjfdb3) 8> start EXTRACT PZWJFBOR
GGSCI (zwjfdb3) 9> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EZWJFBOR 00:00:00 00:00:08
EXTRACT RUNNING PZWJFBOR 00:00:00 00:00:16
#传递表结构
GGSCI (zwjfdb3) 4> view param defgen
DEFSFILE dirdef/source.def, PURGE
USERID xxxxxx, PASSWORD xxxx
TABLE xx.xx;
TABLE xx.xx;
[oracle@zwjfdb3 12.2]$ ./defgen paramfile dirprm/defgen.prm
***********************************************************************
Oracle GoldenGate Table Definition Generator for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 16:58:29
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
Starting at 2017-12-05 16:21:03
***********************************************************************
Operating System Version:
Linux
Version #1 SMP Thu Feb 23 03:04:39 UTC 2017, Release 3.10.0-514.6.2.el7.x86_64
Node: zwjfdb3
Machine: x86_64
soft limit hard limit
Address Space Size : unlimited unlimited
Heap Size : unlimited unlimited
File Size : unlimited unlimited
CPU Time : unlimited unlimited
Process id: 375
***********************************************************************
** Running with the following parameters **
***********************************************************************
把./dirdef/source.def文件拷贝到目标端的./dirdef目录下
二、目标端(kafka)
#java必须为1.8
[root@bigdata01 ~]$ java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
[root@bigdata01 ~]$
#创建ogg管理用户,并更改ogg安装路径权限
[root@bigdata01 ~]$ groupadd ogg
[root@bigdata01 ~]$ useradd -g ogg ogg
[root@bigdata01 ~]$ chown -R ogg:ogg /opt/ogg
#上传并解压ggs_Adapters_Linux_x64.tar到相应目录
[ogg@bigdata01 ogg]$ pwd
/opt/ogg
[ogg@bigdata01 ogg]$ ll
total 582548
drwxr-xr-x 6 ogg ogg 4096 Jun 7 2016 AdapterExamples
-rw-r----- 1 ogg ogg 426 Oct 15 2010 bcpfmt.tpl
-rw-r----- 1 ogg ogg 1725 Oct 15 2010 bcrypt.txt
-rwxrwxr-x 1 ogg ogg 8557335 May 1 2016 cachefiledump
-rwxrwxr-x 1 ogg ogg 8730645 May 1 2016 checkprm
-rwxr-x--- 1 ogg ogg 9567306 May 1 2016 convchk
-rwxrwxr-x 1 ogg ogg 15019428 May 1 2016 convprm
-rw-r----- 1 ogg ogg 159 Oct 15 2010 db2cntl.tpl
drwxr-x--- 2 ogg ogg 4096 Dec 5 14:05 dirchk
drwxr-x--- 2 ogg ogg 4096 Dec 5 14:05 dircrd
#配置环境变量
[ogg@bigdata01 ~]$ cat ~/.bash_profile
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
# User specific environment and startup programs
export JAVA_HOME=/opt/java8
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=/opt/java8/jre/lib/amd64/libjsig.so:/opt/java8/jre/lib/amd64/server/libjvm.so:/opt/java8/jre/lib/amd64/server:/opt/java8/jre/lib/amd64
PATH=$PATH:$HOME/bin:$JAVA_HOME:$JAVA_HOME/bin:$OGG_HOME
export PATH
[ogg@bigdata01 ~]$
#登录并用命令创建ogg所需文件夹
[ogg@bigdata01 ogg]$ ./ggsci
Oracle GoldenGate Command Interpreter
Version 12.2.0.1.160419 OGGCORE_12.2.0.1.0OGGBP_PLATFORMS_160430.1401
Linux, x64, 64bit (optimized), Generic on Apr 30 2016 16:21:34
Operating system character set identified as UTF-8.
Copyright (C) 1995, 2016, Oracle and/or its affiliates. All rights reserved.
GGSCI (bigdata01) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER STOPPED
GGSCI (bigdata01) 2> create subdirs
Creating subdirectories under current directory /opt/ogg
Parameter files /opt/ogg/dirprm: created
Report files /opt/ogg/dirrpt: created
Checkpoint files /opt/ogg/dirchk: created
Process status files /opt/ogg/dirpcs: created
SQL script files /opt/ogg/dirsql: created
Database definitions files /opt/ogg/dirdef: created
Extract data files /opt/ogg/dirdat: created
Temporary files /opt/ogg/dirtmp: created
Credential store files /opt/ogg/dircrd: created
Masterkey wallet files /opt/ogg/dirwlt: created
Dump files /opt/ogg/dirdmp: created
#配置mgr
GGSCI (bigdata01) 3> view param MGR
PORT 7809
DYNAMICPORTLIST 7810-7909
--定期清理dirdat路径下的本地队列,保留期限10天,过期后自动删除。
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
#开启mgr
GGSCI (bigdata01) 4> start mgr
Manager started.
GGSCI (bigdata01) 5> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
#配置kafka
[ogg@bigdata01 big-data]$ cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/
[ogg@bigdata01 big-data]$ cd $OGG_HOME/dirprm/
[ogg@bigdata01 big-data]$ vi kafka.props
[ogg@bigdata01 big-data]$ cat kafka.props
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.TopicName =ogg_zwjfborrower
gg.handler.kafkahandler.format =json
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode =op
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/data/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
#配置custom_kafka_producer.properties
[ogg@bigdata01 dirprm]$ cat custom_kafka_producer.properties
bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=102400
linger.ms=10000
#定义replication
GGSCI (bigdata01) 2> view param RZWJFBOR
REPLICAT RZWJFBOR
Setenv (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
SOURCEDEFS dirdef/source.def
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP ZWJFBORROWER.*, TARGET ZWJFBORROWER.*;
#指定Trail文件
GGSCI (bigdata01) 2> add replicat RZWJFBOR, exttrail ./dirdat/zb
#启动replicat进程
GGSCI (bigdata01) 2>start replicat RZWJFBOR
GGSCI (bigdata01) 3> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING RZWJFBOR 00:00:00 00:00:08
三、测试数据是否到kafka
[root@bigdata01 ~]# /data/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/bin/kafka-console-consumer --zookeeper bigdata01:2181 --topic ogg_zwjfborrower --from-beginning