logical replication

简述

PostgreSQL 10.0以后的版本在logical decoding(pg9.4 开始支持)的基础上做了进一步的开发,可以直接通过命令的方式在数据库实现表级数据的同步

限制及特性

1、只支持普通表生效,不支持序列、视图、物化视图、外部表、分区表和大对象

2、只支持普通表的DML(INSERT、UPDATE、DELETE)操作,不支持truncate、DDL操作

3、需要同步的表必须设置REPLICA IDENTITY 不能为noting(默认值是default),同时表中必须包含主键,否则delete和update报错

4、一个publisher可以包含一张或多张表,一张表可以有一个或多个publishers

5、一个发布者可以有多个订阅者订阅,一个订阅者也可以同时订阅多个发布者,在同一个数据库下订阅者不能对同一个发布者的表重复订阅(避免数据冲突)

6、逻辑复制不同于流复制,不是严格的主从关系,订阅者端的普通表依然可以进行增删改操作

7、同步表的表结构需要在发布者和订阅者两边保持一致(列的顺序允许不一样,但是列对应的数据类型必须一致)

8、如果订阅者端的数据被误删,想要从发布者重新copy同步表的数据,只能以重建同步表所在的订阅者的方式来实现

环境搭建

配置参数

修改postgresql.conf数据库参数文件(修改这些参数需要重启数据库)
  • 发布者端设置
    设置wal_level级别为logical:wal_level = logical

    设置max_wal_senders,此参数值要不小于max_replication_slots的参数值,默认值是10

    设置max_replication_slots,此参数值不少于subscriptions的个数,默认值是10
  • 订阅者端设置
    设置wal_level级别为logical:wal_level = logical

    设置max_logical_replication_workers,此参数值不少于订阅者的个数,默认是4

    设置max_worker_processes,此参数值不少于max_logical_replication_workers值+1
在pg_hba.conf添加白名单(根据真实情况自行限制网段)

host all repuser 0.0.0.0/0 md5

创建专门用于逻辑复制的超户(建议使用uuid作为密码)

create user repuser superuser login password 'repuser1234';

创建发布者
publisher是逻辑复制的起点
--查看pub_db数据库的发布者
pub_db=# \dRp
                  List of publications
 Name | Owner | All tables | Inserts | Updates | Deletes 
------+-------+------------+---------+---------+---------
(0 rows)

--在pub_db数据库上创建名为mypub的发布者
pub_db=# CREATE PUBLICATION mypub;
CREATE PUBLICATION
pub_db=# 
pub_db=# \dRp
                    List of publications
 Name  |  Owner   | All tables | Inserts | Updates | Deletes 
-------+----------+------------+---------+---------+---------
 mypub | postgres | f          | t       | t       | t
(1 row)

--查看mypub发布的详细信息
pub_db=# \dRp+
                  Publication mypub
  Owner   | All tables | Inserts | Updates | Deletes 
----------+------------+---------+---------+---------
 postgres | f          | t       | t       | t
(1 row)
创建订阅者
subscriber是逻辑复制的下游
--查看sub_db数据库下的订阅者
sub_db=# \dRs
        List of subscriptions
 Name | Owner | Enabled | Publication 
------+-------+---------+-------------
(0 rows)

--在sub_db数据库上创建名为mysub的订阅者
sub_db=# CREATE SUBSCRIPTION mysub CONNECTION 'dbname=pub_db host=pgtest1 user=repuser password=repuser1234 port=6432' PUBLICATION mypub;
NOTICE:  created replication slot "mysub" on publisher
CREATE SUBSCRIPTION
sub_db=# 
sub_db=# 
sub_db=# \dRs
          List of subscriptions
 Name  |  Owner   | Enabled | Publication 
-------+----------+---------+-------------
 mysub | postgres | t       | {mypub}
(1 row)

--查看订阅者mysub的详细信息
sub_db=# \dRs+
                                                                List of subscriptions
 Name  |  Owner   | Enabled | Publication | Synchronous commit |                                      Conninfo                                       
-------+----------+---------+-------------+--------------------+-------------------------------------------------------------------------------------
 mysub | postgres | t       | {mypub}     | off                | dbname=pub_db host=pgtest1 user=repuser password=repuser1234 port=6432
(1 row)
添加需要同步的表
发布者端
--创建表
pub_db=# create table logical_tb1(id int primary key,col1 varchar(8),col2 numeric(10,2),col3 jsonb,col4 hstore,col5 timestamptz default now(),col6 int[],col7 ltree);
CREATE TABLE
pub_db=# insert into logical_tb1(id,col1) select generate_series(1,2000),'tester';
INSERT 0 2000

--添加到发布者mypub
pub_db=# alter publication mypub add table logical_tb1;
ALTER PUBLICATION

--查看发布者的详细信息
pub_db=# \dRp+ mypub
                  Publication mypub
  Owner   | All tables | Inserts | Updates | Deletes 
----------+------------+---------+---------+---------
 postgres | f          | t       | t       | t
Tables:
    "public.logical_tb1"
订阅者端
--创建相同的表
sub_db=# create table logical_tb1(id int primary key,col1 varchar(8),col2 numeric(10,2),col3 jsonb,col4 hstore,col5 timestamptz default now(),col6 int[],col7 ltree);
CREATE TABLE

--刷新一下订阅者
sub_db=# ALTER SUBSCRIPTION mysub REFRESH PUBLICATION;
ALTER SUBSCRIPTION

--查询数据是否已经同步过来
sub_db=# select count(*) from logical_tb1;
 count 
-------
  2000
(1 row)
双向同步

这个是根据逻辑复制表级复制的原理,在pub_db上创建一个发布者x_pub,在sub_db上创建一个订阅者x_sub,来实现表x的数据从pub_db数据库同步到sub_db数据库的x表;

同时在sub_db上创建一个发布者y_pub,在pub_db上创建一个订阅者y_sub,来实现表y的数据从sub_db数据库同步到pub_db数据库的y表

具体的实现方式参考步骤:创建发布者、创建订阅者、添加同步表

约束条件表

对于没有任何约束条件的普通表实现逻辑同步很简单,直接将表添加到发布者即可,但是有约束条件的表如何实现逻辑同步呢?

条件约束

先给出结论:对于条件符合约束的数据在订阅端不影响同步,不符合条件约束的数据在订阅端会同步报错

--在sub_db上的同步表添加一个check 约束
sub_db=# alter table logical_tb1 add constraint col1_check check(col1 = 'test');
ALTER TABLE
sub_db=# \d logical_tb1
                     Table "public.logical_tb1"
 Column |           Type           | Collation | Nullable | Default 
--------+--------------------------+-----------+----------+---------
 id     | integer                  |           | not null | 
 col1   | character varying(8)     |           |          | 
 col2   | numeric(10,2)            |           |          | 
 col3   | json                     |           |          | 
 col4   | hstore                   |           |          | 
 col5   | timestamp with time zone |           |          | now()
 col6   | integer[]                |           |          | 
 col7   | ltree                    |           |          | 
Indexes:
    "logical_tb1_pkey" PRIMARY KEY, btree (id)
Check constraints:
    "col1_check" CHECK (col1::text = 'test'::text)

--在pub_db上的同步表插入两条测试数据
pub_db=# insert into logical_tb1 values(1,'test','999.99','{"ja":"1"}','ha=>1',now(),'{9}');
INSERT 0 1
pub_db=# insert into logical_tb1 values(2,'test2','999.99','{"ja":"1"}','ha=>1',now(),'{9}');
INSERT 0 1

--检查sub_db上同步表的数据同步情况
sub_db=# select * from logical_tb1 ;
 id | col1 |  col2  |    col3    |   col4    |             col5              | col6 | col7 
----+------+--------+------------+-----------+-------------------------------+------+------
  1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  | 
(1 row)

--检查日志发现了报错:
ERROR:  new row for relation "logical_tb1" violates check constraint "col1_check"
DETAIL:  Failing row contains (2, test2, 999.99, {"ja":"1"}, "ha"=>"1", 2018-04-11 15:29:12.395614+08, {9}, null).

--删除sub_db上同步表的约束后,数据继续同步
sub_db=# alter table logical_tb1 drop CONSTRAINT col1_check;
ALTER TABLE
sub_db=# select * from logical_tb1 ;
 id | col1  |  col2  |    col3    |   col4    |             col5              | col6 | col7 
----+-------+--------+------------+-----------+-------------------------------+------+------
  1 | test  | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  | 
  2 | test2 | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:29:12.395614+08 | {9}  | 
(2 rows)
外键约束

先给出结论:如果发布端没有外键约束条件,而订阅端有外键约束条件,同步数据不受订阅端外键约束条件影响

--订阅者端创建非同步表
sub_db=# create table logical_tb2(id int primary key,for_id int);
CREATE TABLE

--订阅者端同步表添加列col8并添加外键约束,引用logical_tb2的主键
sub_db=# alter table logical_tb1 add column col8 int references logical_tb2(id);
ALTER TABLE

--订阅端插入初始化数据
sub_db=# insert into logical_tb2 values(1,1),(2,2),(3,3);
INSERT 0 3

sub_db=# select * from logical_tb2;
 id | for_id 
----+--------
  1 |      1
  2 |      2
  3 |      3
(3 rows)

--发布者端插入违反外键约束的数据(发布者端没有外加约束)
pub_db=# insert into logical_tb1 values(5,'test','999.99','{"ja":"1"}','ha=>1',now(),'{9}','beijing.haidian',4);
INSERT 0 1

--观察订阅者端的数据,依然能同步进来,不受外键约束的影响
sub_db=# select * from logical_tb1;
 id | col1 |  col2  |    col3    |   col4    |             col5              | col6 |      col7       | col8 
----+------+--------+------------+-----------+-------------------------------+------+-----------------+------
  1 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:12:58.921844+08 | {9}  |                 |     
  2 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 15:40:28.794865+08 | {9}  |                 |     
  3 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:39:15.537949+08 | {9}  | beijing.haidian |    1
  4 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:40:28.885829+08 | {9}  | beijing.haidian |    4
  5 | test | 999.99 | {"ja":"1"} | "ha"=>"1" | 2018-04-11 18:51:50.167068+08 | {9}  | beijing.haidian |    4
(5 rows)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,009评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,808评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,891评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,283评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,285评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,409评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,809评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,487评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,680评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,499评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,548评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,268评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,815评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,872评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,102评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,683评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,253评论 2 341