数据环境
服务器 | 端口 | 服务 | 类型 |
---|---|---|---|
192.168.1.100 | 5432 | pipelinedb、kakfa | 数据提供者 |
192.168.1.110 | 5433 | postgres | 数据订阅者 |
pglogical安装
192.168.1.110 安装过程
//下载rpm包
yum install http://packages.2ndquadrant.com/pglogical/yum-repo-rpms/pglogical-rhel-1.0-3.noarch.rpm
//安装扩展包
yum install postgresql96-pglogical
修改pg 配置文件
vim postgresql.conf
//修改如下
wal_level = ‘logical' //必须打开
max_worker_processes = 10
max_replication_slots = 10
max_wal_senders = 10
shared_preload_libraries = 'pglogical' //必须打开
完成修改后重启postgresql 服务
pg_ctl -D $PGDATA -l $PGDATA/logs/pg96.log restart
连接安装pglogical扩展
psql -h 192.168.1.110 -p5433
#create database demo
#\c demo
#create extension pglogial
#\dx;
192.168.1.100 安装过程
pipelinedb环境基于postgresql9.5 因此需要先下载pg9.5版本
//下载rpm包
yum install http://packages.2ndquadrant.com/pglogical/yum-repo-rpms/pglogical-rhel-1.0-3.noarch.rpm
yum localinstall https://download.postgresql.org/pub/repos/yum/9.5/redhat/rhel-7-x86_64/pgdg-centos95-9.5-2.noarch.rpm
yum -y install postgresql95-server postgresql95-contrib postgresql95 postgresql95-devel postgresql95 postgresql95-pglogical
#安装完成后复制postgresql9.5目录中的内容到pipelinedb安装目录中
mv /usr/pgsql-9.5//lib/* /usr/lib/pipelinedb/lib/pipelinedb/
mv /usr/pgsql-9.5/bin/* /usr/lib/pipelinedb/bin/
mv /usr/pgsql-9.5/share/extension/* /usr/lib/pipelinedb/share/pipelinedb/extension/
修改pipelinedb配置文件
vim pipelinedb.conf
listen_addresses = '*'
max_connections = 1000
port = 5432
#红色的必须打开注释
wal_level = 'logical'
max_worker_processes = 10
max_replication_slots = 10
max_wal_senders = 10
shared_preload_libraries = 'pglogical'
vim pg_hba.conf
local replication postgres trust
host replication postgres 127.0.0.1/32 trust
host replication postgres ::1/128 trust
#增加服务器90地址的同步认证
host replication postgres 192.168.1.110/32 trust
重启pipelinedb数据库
#根据实际安装目录
pipeline-ctl -D /data/piepelinedb/data -l logfile restart
连接安装pglogical扩展
psql -h 192.168.1.100 -p5432
#create database pipeline
#\c pipeline
#create extension pglogial
#\dx;
pipeline_kafka集成
git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure --prefix=/usr
make
sudo make install
修改pipelinedb配置文件
vim pipelinedb.conf
shared_preload_libraries = ' pglogical, pipeline_kafka’
重启生效
#根据实际安装目录
pipeline-ctl -D /data/pdb -l /data/logs/postgres/logfile restart
连接安装pipeline_kafka扩展
psql -h 192.168.1.100 -p5432 pipeline
#create extension pipeline_kafka
#\dx;
List of installed extensions
Name | Version | Schema | Description
----------------+---------+----------------+----------------------------------------
pglogical | 2.0.1 | pglogical | PostgreSQL Logical Replication
pipeline_kafka | 0.9.3 | pipeline_kafka | PipelineDB extension for Kafka support
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
安装成功
实例操作
连接上pipelinedb终端
psql -h 192.168.1.100 -p5432 pipeline
#配置kafka broker
# SELECT pipeline_kafka.add_broker(‘localhost:9092’);
#创建数据流
#create stream stat_stream (monitor_id int, task_id int, time_slot timestamp, resp_time int);
#创建视图
CREATE continuous VIEW stats_view WITH(pk = 'unique_key') AS SELECT
monitor_id,
task_id,
date_trunc('hour', time_slot) AS shour,
SUM(resp_time) AS sum_resp,
md5(
monitor_id || '_' || task_id || '_' || date_trunc('hour', time_slot)
) AS unique_key
FROM
stat_stream
GROUP BY
monitor_id,
task_id,
shour,
unique_key;
pipeline=# \d+
List of relations
Schema | Name | Type | Owner | Size | Description
--------+---------------------------+----------+----------+------------+-------------
public | stats_view_mrel | table | postgres | 8192 bytes |
public | stats_view_seq | sequence | postgres | 8192 bytes |
(2 rows)
#开始消费kafka消息
# topic是my-topic,连接PipelineDB Stream名是msg_stream,消息类型是text,消息以英文逗号分割。
#SELECT
pipeline_kafka.consume_begin(
'my-topic',
'stat_stream',
format := 'text',
DELIMITER := ‘,',
QUOTE := NULL,
ESCAPE := NULL,
batchsize := 1000,
maxbytes := 32000000,
parallelism := 1,
start_offset := NULL
);
kafka生产测试
进入kafka安装目录
./bin/kafka-console-producer.sh --broker-list 192.168.1.100:9092 --topic my-topic
>1,2,'2017-09-22 10:00:00’,30
pipelined查看数据流
pipeline=# select * from stats_view;
monitor_id | task_id | shour | sum_resp
------------+---------+---------------------+----------
1 | 2 | 2017-09-22 10:00:00 | 30
(1 row)
kafka 停止写流
psql -h 192.168.1.100 -p5432 pipeline
#SELECT pipeline_kafka.consume_end('my-topic', 'msg_stream');