简述
PostgreSQL 10.0以后的版本在logical decoding(pg9.4 开始支持)的基础上做了进一步的开发,可以直接通过命令的方式在数据库实现表级数据的同步
限制及特性
1、只支持普通表生效,不支持序列、视图、物化视图、外部表、分区表和大对象
2、只支持普通表的DML(INSERT、UPDATE、DELETE)操作,不支持truncate、DDL操作
3、需要同步的表必须设置REPLICA IDENTITY 不能为noting(默认值是default),同时表中必须包含主键,否则delete和update报错
4、一个publisher可以包含一张或多张表,一张表可以有一个或多个publishers
5、一个发布者可以有多个订阅者订阅,一个订阅者也可以同时订阅多个发布者,在同一个数据库下订阅者不能对同一个发布者的表重复订阅(避免数据冲突)
6、逻辑复制不同于流复制,不是严格的主从关系,订阅者端的普通表依然可以进行增删改操作
7、同步表的表结构需要在发布者和订阅者两边保持一致(列的顺序允许不一样,但是列对应的数据类型必须一致)
8、如果订阅者端的数据被误删,想要从发布者重新copy同步表的数据,只能以重建同步表所在的订阅者的方式来实现
环境搭建
配置参数
修改postgresql.conf数据库参数文件(修改这些参数需要重启数据库)
- 发布者端设置
设置wal_level级别为logical:wal_level = logical
设置max_wal_senders,此参数值要不小于max_replication_slots的参数值,默认值是10
设置max_replication_slots,此参数值不少于subscriptions的个数,默认值是10 - 订阅者端设置
设置wal_level级别为logical:wal_level = logical
设置max_logical_replication_workers,此参数值不少于订阅者的个数,默认是4
设置max_worker_processes,此参数值不少于max_logical_replication_workers值+1
在pg_hba.conf添加白名单(根据真实情况自行限制网段)
host all repuser 0.0.0.0/0 md5
创建专门用于逻辑复制的超户(建议使用uuid作为密码)
create user repuser superuser login password 'repuser1234';
创建发布者
publisher是逻辑复制的起点
--查看pub_db数据库的发布者
pub_db=# \dRp
List of publications
Name | Owner | All tables | Inserts | Updates | Deletes
------+-------+------------+---------+---------+---------
(0 rows)
--在pub_db数据库上创建名为mypub的发布者
pub_db=# CREATE PUBLICATION mypub;
CREATE PUBLICATION
pub_db=#
pub_db=# \dRp
List of publications
Name | Owner | All tables | Inserts | Updates | Deletes
-------+----------+------------+---------+---------+---------
mypub | postgres | f | t | t | t
(1 row)
--查看mypub发布的详细信息
pub_db=# \dRp+
Publication mypub
Owner | All tables | Inserts | Updates | Deletes
----------+------------+---------+---------+---------
postgres | f | t | t | t
(1 row)
创建订阅者
subscriber是逻辑复制的下游
--查看sub_db数据库下的订阅者
sub_db=# \dRs
List of subscriptions
Name | Owner | Enabled | Publication
------+-------+---------+-------------
(0 rows)
--在sub_db数据库上创建名为mysub的订阅者
sub_db=# CREATE SUBSCRIPTION mysub CONNECTION 'dbname=pub_db host=pgtest1 user=repuser password=repuser1234 port=6432' PUBLICATION mypub;
NOTICE: created replication slot "mysub" on publisher
CREATE SUBSCRIPTION
sub_db=#
sub_db=#
sub_db=# \dRs
List of subscriptions
Name | Owner | Enabled | Publication
-------+----------+---------+-------------
mysub | postgres | t | {mypub}
(1 row)
--查看订阅者mysub的详细信息
sub_db=# \dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Synchronous commit | Conninfo
-------+----------+---------+-------------+--------------------+-------------------------------------------------------------------------------------
mysub | postgres | t | {mypub} | off | dbname=pub_db host=pgtest1 user=repuser password=repuser1234 port=6432
(1 row)
添加需要同步的表
发布者端
--创建表
pub_db=# create table logical_tb1(id int primary key,col1 varchar(8),col2 numeric(10,2),col3 jsonb,col4 hstore,col5 timestamptz default now(),col6 int[],col7 ltree);
CREATE TABLE
pub_db=# insert into logical_tb1(id,col1) select generate_series(1,2000),'tester';
INSERT 0 2000
--添加到发布者mypub
pub_db=# alter publication mypub add table logical_tb1;
ALTER PUBLICATION
--查看发布者的详细信息
pub_db=# \dRp+ mypub
Publication mypub
Owner | All tables | Inserts | Updates | Deletes
----------+------------+---------+---------+---------
postgres | f | t | t | t
Tables:
"public.logical_tb1"
订阅者端
--创建相同的表
sub_db=# create table logical_tb1(id int primary key,col1 varchar(8),col2 numeric(10,2),col3 jsonb,col4 hstore,col5 timestamptz default now(),col6 int[],col7 ltree);
CREATE TABLE
--刷新一下订阅者
sub_db=# ALTER SUBSCRIPTION mysub REFRESH PUBLICATION;
ALTER SUBSCRIPTION
--查询数据是否已经同步过来
sub_db=# select count(*) from logical_tb1;
count
-------
2000
(1 row)
双向同步
这个是根据逻辑复制表级复制的原理,在pub_db上创建一个发布者x_pub,在sub_db上创建一个订阅者x_sub,来实现表x的数据从pub_db数据库同步到sub_db数据库的x表;
同时在sub_db上创建一个发布者y_pub,在pub_db上创建一个订阅者y_sub,来实现表y的数据从sub_db数据库同步到pub_db数据库的y表
具体的实现方式参考步骤:创建发布者、创建订阅者、添加同步表
约束条件表
对于没有任何约束条件的普通表实现逻辑同步很简单,直接将表添加到发布者即可,但是有约束条件的表如何实现逻辑同步呢?
条件约束
先给出结论:对于条件符合约束的数据在订阅端不影响同步,不符合条件约束的数据在订阅端会同步报错
--在sub_db上的同步表添加一个check 约束
sub_db=# alter table logical_tb1 add constraint col1_check check(col1 = 'test');
ALTER TABLE
sub_db=# \d logical_tb1
Table "public.logical_tb1"
Column | Type | Collation | Nullable | Default
--------+--------------------------+-----------+----------+---------
id | integer | | not null |
col1 | character varying(8) | | |
col2 | numeric(10,2) | | |
col3 | json | | |
col4 | hstore | | |
col5 | timestamp with time zone | | | now()
col6 | integer[] | | |
col7 | ltree | | |
Indexes:
"logical_tb1_pkey" PRIMARY KEY, btree (id)
Check constraints:
"col1_check" CHECK (col1::text = 'test'::text)
--在pub_db上的同步表插入两条测试数据
pub_db=# insert into logical_tb1 values(1,'test','999.99','{"ja":"1"}','ha=>1',now(),'{9}');
INSERT 0 1
pub_db=# insert into logical_tb1 values(2,'test2','999.99','{"ja":"1"}','ha=>1',now(),'{9}');
INSERT 0 1
--检查sub_db上同步表的数据同步情况
sub_db=# select * from logical_tb1 ;
id | col1 | col2 | col3 | col4 | col5 | col6 | col7
----+------+--------+------------+-----------+-------------------------------+------+------
1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9} |
(1 row)
--检查日志发现了报错:
ERROR: new row for relation "logical_tb1" violates check constraint "col1_check"
DETAIL: Failing row contains (2, test2, 999.99, {"ja":"1"}, "ha"=>"1", 2018-04-11 15:29:12.395614+08, {9}, null).
--删除sub_db上同步表的约束后,数据继续同步
sub_db=# alter table logical_tb1 drop CONSTRAINT col1_check;
ALTER TABLE
sub_db=# select * from logical_tb1 ;
id | col1 | col2 | col3 | col4 | col5 | col6 | col7
----+-------+--------+------------+-----------+-------------------------------+------+------
1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9} |
2 | test2 | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:29:12.395614+08 | {9} |
(2 rows)
外键约束
先给出结论:如果发布端没有外键约束条件,而订阅端有外键约束条件,同步数据不受订阅端外键约束条件影响
--订阅者端创建非同步表
sub_db=# create table logical_tb2(id int primary key,for_id int);
CREATE TABLE
--订阅者端同步表添加列col8并添加外键约束,引用logical_tb2的主键
sub_db=# alter table logical_tb1 add column col8 int references logical_tb2(id);
ALTER TABLE
--订阅端插入初始化数据
sub_db=# insert into logical_tb2 values(1,1),(2,2),(3,3);
INSERT 0 3
sub_db=# select * from logical_tb2;
id | for_id
----+--------
1 | 1
2 | 2
3 | 3
(3 rows)
--发布者端插入违反外键约束的数据(发布者端没有外加约束)
pub_db=# insert into logical_tb1 values(5,'test','999.99','{"ja":"1"}','ha=>1',now(),'{9}','beijing.haidian',4);
INSERT 0 1
--观察订阅者端的数据,依然能同步进来,不受外键约束的影响
sub_db=# select * from logical_tb1;
id | col1 | col2 | col3 | col4 | col5 | col6 | col7 | col8
----+------+--------+------------+-----------+-------------------------------+------+-----------------+------
1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9} | |
2 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:40:28.794865+08 | {9} | |
3 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:39:15.537949+08 | {9} | beijing.haidian | 1
4 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:40:28.885829+08 | {9} | beijing.haidian | 4
5 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:51:50.167068+08 | {9} | beijing.haidian | 4
(5 rows)