数据收集之binlog同步---Canal

Canal是阿里开源的binlog同步工具。可以解析binlog,并将解析后的数据同步到任何目标存储中。

1

Canal工作原理


    1、mysql master节点将改变记录保存到二进制binlog文件中。

    2、canal 把自己伪装成mysql slave节点,向master节点发送dump binlog请求。master节点收到请求并找到对应binlog文件及binlog位置pos。

    3、master根据pos读取binlog event,不断发往slave节点(也就是canal)。

    4、slave节点收到binlog events并拷贝到slave的中继日志。

    5、slave结点回放中继日志中的event并同步。

    6、新的binlog被master不断广播到slave节点,slave节点源源不断解析同步。

1

2

3

4

5

6

Canal目录结构

canal-1.0.24/

├── bin

│   ├── canal.pid

│   ├── startup.bat 启动canal server脚本

│   ├── startup.sh  启动canal server脚本

│   └── stop.sh    停止canal server脚本

├── conf

│   ├── canal.properties common属性,是全局instance配置文件,被多个instance实例共享

│   ├── canal_test  instance实例配置目录

│   │   ├── instance.properties instance实例配置文件

│   │   ├── meta.dat 记录Instance实例消费binlog position位置等信息

│   ├── example 默认instance实例

│   │   └── instance.properties

│   ├── logback.xml 日志分割

│   └── spring instance实例可选处理模式

│      ├── default-instance.xml

│      ├── file-instance.xml 基于文件

│      ├── group-instance.xml

│      ├── local-instance.xml

│      └── memory-instance.xml 基于内存

├── lib

└── logs

    ├── canal canal server运行日志

    │   └── canal.log

    ├── canal_test instance实例日志

    │   ├── canal_test.log

    │   └── meta.log

    └── example 默认instance实例日志

        └── example.log

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

Canal 2种方式部署

配置Mysql

MySQL 开启Binlog

修改/etc/my.cnf 配置文件,增加如下配置

[root@node2 ~]# vim /etc/my.cnf

#开启binlog

[mysqld]

#binlog文件保存目录及binlog文件名前缀

#binlog文件保存目录: /var/lib/mysql/

#binlog文件名前缀: mysql-binlog

#mysql向文件名前缀添加数字后缀来按顺序创建二进制日志文件 如mysql-binlog.000006 mysql-binlog.000007

log-bin=/var/lib/mysql/mysql-binlog

#选择基于行的日志记录方式

binlog-format=ROW

#服务器 id

#binlog数据中包含server_id,标识该数据是由那个server同步过来的

server_id=1

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

MySQL 配置Canal Server权限

CREATE USER 'canal_sync'@'%' IDENTIFIED BY 'canal_sync123';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal_sync'@'%';

FLUSH PRIVILEGES;

1

2

3

MySQL 建库建表

create database canal_test;

use canal_test;

create table if not exists `user_info`(

  `userid` int,

  `name` varchar(100),

  `age` int

)engine=innodb default charset=utf8;

1

2

3

4

5

6

7

Canal Server单节点模式

1、Canal Server单节点模式,Canal Client可采用多Client多活或单Client进程的方式。但要注意存在的问题。

    存在问题:

        (1)Server端会有单点问题。

        (2)多Client多活,同时工作,并互为主备。但无法保证binlog的顺序。实际情况下一个Client进程加监控就足以满足需要。

2、单节点模式binlog postion偏移量记录的位置:每个instance配置目录下的meta.dat文件中。如canal-1.0.24/conf/canal_test/meta.dat

1

2

3

4

5

Server端配置

下载解压

[root@node3 software]# pwd

/data/software

[root@node3 software]# wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz

[root@node3 software]# mkdir canal-1.0.24

[root@node3 software]# tar -zxvf canal.deployer-1.0.24.tar.gz -C canal-1.0.24

1

2

3

4

5

6

7

8

9

配置Instance

[root@node3 conf]# pwd

/data/software/canal-1.0.24/conf

#约定以要同步的库名作为配置文件目录名

[root@node3 conf]# cp -r example/ canal_test

[root@node3 conf]# vim canal_test/instance.properties

#slaveId

#每个instance都会伪装成一个mysql slave节点

#同一个mysql实例(待同步的mysql节点),此slaveId应该唯一

#若是集群模式,则同一集群中,相同的instance,此slaveId应相同

canal.instance.mysql.slaveId = 111101

#master 库

#canal运行时首要连接库 可以为mysql 主库

#如果只能基于从库的binlog,这里用mysql从库也可

canal.instance.master.address = node2:3306

#起始binlog文件

canal.instance.master.journal.name =

#起始binlog偏移量,同步该binlog位点后的数据

canal.instance.master.position =

#起始binlog时间戳,找到该时间戳对应的binlog位点后开始同步

canal.instance.master.timestamp =

#standby 库

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

#mysql 数据库账号

canal.instance.dbUsername = canal_sync

canal.instance.dbPassword = canal_sync123

#默认数据库

canal.instance.defaultDatabaseName =

canal.instance.connectionCharset = UTF-8

#注意:

#(1)这里黑白名单可以覆盖defaultDatabaseName。

#(2)如果Client端 配置了connector.subscribe则会覆盖黑白名单配置

#表过滤--白名单 只监听库表

#如testDB\..*只监听testDB数据库,testDB\.test_1 只监听testDB库中test_1表。多个用逗号分开

canal.instance.filter.regex = .*\\..*

#表过滤--黑名单 排除库表

canal.instance.filter.black.regex =

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

启动

[root@node3 canal-1.0.24]# bin/startup.sh

查看启动日志,根据日志中的报错排查问题

[root@node3 canal-1.0.24]# tail -f logs/canal_test/canal_test.log

1

2

3

4

Client端消费

pom依赖

<dependency>

    <groupId>com.alibaba.otter</groupId>

    <artifactId>canal.client</artifactId>

    <version>1.0.24</version>

</dependency>

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

import java.util.List;

public class ClientSample {

    public static void main(String args[]) {

        // 单连接

        CanalConnector connector = CanalConnectors.newSingleConnector(

                new InetSocketAddress("node3", 11111), "canal_test", "", "");

        // 集群连接

        //CanalConnector connector = CanalConnectors.newClusterConnector("192.168.113.101:2181,192.168.113.102:2181,192.168.113.103:2181/canal/cluster1", "canal_test", "", "");

        //计数器

        int emptyCount = 0;

        //一次最多拉多少条Message

        //注意:

        int batchSize = 1000;

        try {

            //和server建立连接

            connector.connect();

            //订阅表

            connector.subscribe("canal_test.user_info");

            //回滚到上次ack的位置

            connector.rollback();

            //最大空闲次数

            int maxEmptyCount = 1000;

            //死循环去拉取数据

            while (emptyCount < maxEmptyCount) {

                //尝试最多拿batchSize条记录

                //注意:getWithoutAck一次,对应一个Message。

                //一个Message有一个MessageID,还有一个List<CanalEntry.Entry>

                Message message = connector.getWithoutAck(batchSize);

                long batchId = message.getId();

                int size = message.getEntries().size();

                //没有数据---等待

                if (batchId == -1 || size == 0) {

                    emptyCount++;

//                    System.out.println("empty count : " + emptyCount);

                    try {

                        Thread.sleep(1000);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                } else {

                    //拿到数据---开始解析--发送到目的地如kafka/elasticsearch/redis/hbase...等

                    emptyCount = 0;

                    //这里打印内容

                    printEntry(message.getEntries());

                }

                connector.ack(batchId); // 提交确认

                //connector.rollback(batchId); // 处理失败, 回滚数据

            }

            System.out.println("empty too many times, exit");

        } finally {

            connector.disconnect();

        }

    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {

        for (CanalEntry.Entry entry : entrys) {

            //该条数据的数据类型是事务开始或事务结束,不是binlog 二进制数据本身,就跳过继续处理

            //注意:这里是有bug的,当mysql开启binlog,且为Row行模式,且开启了在binlog中显示原始SQL。

            //这时就会有一种新增的类型:CanalEntry.EventType.QUERY

            // 原始

            //if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {

            //    continue;

            //}

            // 修改为 只保留binlog部分对应的Entry并解析

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN //跳过事务开始的Entry

                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND //跳过事务结束的Entry

                    || entry.getHeader().getEventType() == CanalEntry.EventType.QUERY //跳过事务为原始SQL的Entry

                    ) {

                continue;

            }

            //得到当前行变化的数据Before、After

            CanalEntry.RowChange rowChage = null;

            try {

                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

            } catch (Exception e) {

                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),

                        e);

            }

            //事件类型

            CanalEntry.EventType eventType = rowChage.getEventType();

            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",

                    entry.getHeader().getLogfileName(), //binlog文件名

                    entry.getHeader().getLogfileOffset(),//binlog offset

                    entry.getHeader().getSchemaName(),//库名

                    entry.getHeader().getTableName(),//表名

                    eventType //事件类型

            ));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {

                //INSERT

                if (eventType == CanalEntry.EventType.INSERT) {

                    printColumn(rowData.getAfterColumnsList());

                //DELETE

                } else if (eventType == CanalEntry.EventType.DELETE) {

                    printColumn(rowData.getBeforeColumnsList());

                //UPDATE

                } else if (eventType == CanalEntry.EventType.UPDATE) {

                    System.out.println("-------> before");

                    printColumn(rowData.getBeforeColumnsList());

                    System.out.println("-------> after");

                    printColumn(rowData.getAfterColumnsList());

                }

            }

        }

    }

    private static void printColumn(List<CanalEntry.Column> columns) {

        for (CanalEntry.Column column : columns) {

            //列名:列值 该列是否被更新

            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());

        }

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

Insert

#mysql 插入

mysql> insert into user_info(userid,name,age) values(3,'name3',3);

#canal 解析binlog结果

================> binlog[mysql-binlog.000004:1694] , name[canal_test,user_info] , eventType : INSERT

userid : 3    update=true

name : name3    update=true

age : 3    update=true

1

2

3

4

5

6

7

8

Delete

#mysql 删除

mysql> delete from user_info where userid=3;

#canal 解析binlog结果

================> binlog[mysql-binlog.000004:1898] , name[canal_test,user_info] , eventType : DELETE

userid : 3    update=false

name : name3    update=false

age : 3    update=false

1

2

3

4

5

6

7

8

Update

#mysql 修改

mysql> update user_info set name='name13',age=13 where userid=1;

#canal 解析binlog结果

================> binlog[mysql-binlog.000004:2528] , name[canal_test,user_info] , eventType : UPDATE

-------> before

userid : 1    update=false

name : name1    update=false

age : 20    update=false

-------> after

userid : 1    update=false

name : name13    update=true

age : 13    update=true

1

2

3

4

5

6

7

8

9

10

11

12

13

14

Canal Server集群模式(HA模式)

Canal Server集群模式Server端和Client端都采用单主多活的方式。这种协调由Zookeeper承担。

除此之外,Zookeeper还保存了一些元数据信息,如备选Canal Server、当前正在运行的Canal Server、Binlog Position信息等。

1

2

Server端配置

Server1

[root@node3 canal-1.0.24]# vim conf/canal.properties

#同一个canal server集群,此id唯一

canal.id= 1

canal.ip=

#canal client 端访问的端口

canal.port= 11111

#zk的地址 如果多个Canal 集群共享一个ZK,那么每个Canal集群应使用同一且唯一的rootpath

canal.zkServers= node1:2181,node2:2181,node3:2181/canal/cluster1

#canal 持久化数据到zk上的更新频率,单位毫秒

canal.zookeeper.flush.period = 1000

#canal持久化数据到file上的目录,默认和instance.properties为同一目录

canal.file.data.dir = ${canal.conf.dir}

#canal持久化数据到file上的更新频率,单位毫秒

canal.file.flush.period = 1000

#canal内存中可缓存buffer记录数,为2的指数

canal.instance.memory.buffer.size = 16384

#内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小

canal.instance.memory.buffer.memunit = 1024

#canal内存中数据缓存模式

#ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量

#MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小

canal.instance.memory.batch.mode = MEMSIZE

#是否开启心跳检查

canal.instance.detecting.enable = false

#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()

#心跳检查sql

canal.instance.detecting.sql = select 1

#心跳检查频率,单位秒

canal.instance.detecting.interval.time = 3

#心跳检查失败重试次数

canal.instance.detecting.retry.threshold = 3

#心跳检查失败后,是否开启自动mysql自动切换

#心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据

canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery

canal.instance.transaction.size =  1024

#canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒

#mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢

canal.instance.fallbackIntervalInSeconds = 60

#网络链接参数

canal.instance.network.receiveBufferSize = 16384

canal.instance.network.sendBufferSize = 16384

canal.instance.network.soTimeout = 30

#是否忽略DCL的query语句,比如grant/create user等

canal.instance.filter.query.dcl = false

#是否忽略DML的query语句,比如insert/update/delete

canal.instance.filter.query.dml = false

#是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index.

canal.instance.filter.query.ddl = false

#

canal.instance.filter.table.error = false

canal.instance.filter.rows = false

# binlog format/image check

canal.instance.binlog.format = ROW,STATEMENT,MIXED

canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

#ddl语句是否隔离发送,开启隔离可保证每次只返回发送一条ddl数据,不和其他dml语句混合返回

canal.instance.get.ddl.isolation = false

#################################################

#########      destinations        #############

#################################################

#当前节点对应的destinations

#这里定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名目录

canal.destinations= example

#配置文件根目录

canal.conf.dir = ../conf

#开启instance自动扫描

#如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:

#a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动

#b. instance目录删除:卸载对应instance配置,如已启动则进行关闭

#c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作

canal.auto.scan = true

#instance自动扫描的间隔时间,单位秒

canal.auto.scan.interval = 5

#全局配置加载方式

canal.instance.global.mode = spring

#全局lazy模式

canal.instance.global.lazy = false

#全局的manager配置方式的链接信息

#canal.instance.global.manager.address = 127.0.0.1:1099

#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml

#全局的spring配置方式的组件文件

canal.instance.global.spring.xml = classpath:spring/file-instance.xml

#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

Server2

[root@node3 software]# pwd

/data/software

[root@node3 software]# scp -r canal-1.0.24/ root@node1:/data/software

#修改canal.properties

[root@node1 canal-1.0.24]# vim conf/canal.properties

canal.id= 2

1

2

3

4

5

6

7

创建Zookeeper Znode

连上任意一台zookeeper

[root@node1 zookeeper]# bin/zkCli.sh

创建znode

create /canal ""

create /canal/cluster1 ""

1

2

3

4

5

6

启动所有Server

[root@node1 canal-1.0.24]# bin/startup.sh

[root@node3 canal-1.0.24]# bin/startup.sh

1

2

查看Zookeeper中Canal数据

Instance 候选canal server的列表

#可看到两个候选canal server

[zk: localhost:2181(CONNECTED) 17] ls /canal/cluster1/otter/canal/destinations/canal_test/cluster

[192.168.113.103:11111, 192.168.113.101:11111]

1

2

3

Instance 当前服务的canal server

[zk: localhost:2181(CONNECTED) 18] get /canal/cluster1/otter/canal/destinations/canal_test/running

{"active":true,"address":"192.168.113.101:11111","cid":1}

cZxid = 0x2000000028

ctime = Wed Aug 08 03:59:52 CST 2018

mZxid = 0x2000000028

mtime = Wed Aug 08 03:59:52 CST 2018

pZxid = 0x2000000028

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x16515ef4dae0000

dataLength = 57

numChildren = 0

1

2

3

4

5

6

7

8

9

10

11

12

13

Instance对应的消费者

注意:需要在消费者启动后才能看到。

[zk: localhost:2181(CONNECTED) 23] get /canal/cluster1/otter/canal/destinations/canal_test/1001/running

{"active":true,"address":"192.168.113.1:54048","clientId":1001}

cZxid = 0x2000000036

ctime = Wed Aug 08 04:09:25 CST 2018

mZxid = 0x2000000037

mtime = Wed Aug 08 04:09:26 CST 2018

pZxid = 0x2000000036

cversion = 0

dataVersion = 1

aclVersion = 0

ephemeralOwner = 0x36515ef48010001

dataLength = 63

numChildren = 0

1

2

3

4

5

6

7

8

9

10

11

12

13

14

Client端消费

同server单节点模式,只需要修改一行即可。

使用集群连接:

    CanalConnector connector = CanalConnectors.newClusterConnector("192.168.113.101:2181,192.168.113.102:2181,192.168.113.103:2181/canal/cluster1", "canal_test", "", "");

1

2

3

总结和注意

1、生产环境下尽量采用HA的方式。

2、关于Canal消费binlog的顺序,为保证binlog严格有序,尽量不要用多线程。

3、如果Canal消费binlog后的数据要发往kafka,又要保证有序,kafka topic 的partition可以设置成1个分区。

————————————————

版权声明:本文为CSDN博主「wangpei1949」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/wangpei1949/article/details/81501272

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容