Spark:DataFrame生成HFile 批量导入Hbase

批量加载-Bulk Load

在工作过程中有个需求,需要将DataFrame的数据保存进Hbase,并且在Spark集群并没有安装Hbase,此时对于常规的使用put将DataFrame加载进Hbase的方式不在适用,一方面是没有Hbase,另一方面是数据量比较大,通过Put加载数据太慢。
为了实现自己的需求,测试了两个方案

通过hive关联Hbase将数据加载进去

大致步骤

在之前已经有一篇博文是关于hive关联Hbase的,再次仅仅简述处理过程
1.首先将DataFrame进行处理后保存成文件resultFile
2.根据DataFrame字段建立一张Hive临时表tmp_table1
3.将resultFile通过load data 导入临时表tmp_table1
4.建立一张关联Hbase的表hive_hbase_table
5.通过insert into 将tmp_table1的数据插入hive_hbase_table
此时,实现了将DataFrame存入Hbase的目的,如果Spark与Hbase不在同一个集群,那么两个集群之间只需要传输hive底层文件即可。

此方式的优点和缺点

优点:
1.直接保存为hive底层文件(即DataFrame直接写文件,对于里面的字段不需要过多处理)
2.由于hive表关联了Hbase,我们可以通过hive直接查询Hbase的数据(方便对数据进行二次加工)

缺点:
1.需要进行两次数据加载:
第一次加载是根据因为是load速度还很快,基本可以忽略不计;
但是第二第insert into 会触发mapreduce,执行时间与数据量的大小关系很大。
2.耗费额外的空间:
临时表tmp_table1仅仅是作为一个中转站,但是还是会消耗磁盘空间,这种方式消耗磁盘空间的大小接近最终结果的两倍。
具体参考此篇文章

通过HFile 直接将数据导入Hbase

由于客户对数据导入的速度有一定要求,所以第一个方案成为备选方案。于是在查询资料的情况下,找到了第二个方案。
将DataFrame先保存成HFile,然后通过命令直接将文件HFile load进对应的Hbase表。
接下来将详细记录我在这个过程中的踩坑,脱坑历程。
1.第一坑:包找不到
参考了众多博文(其实都是一样的),有几个方案本想来参考一下的,但是自己用的时候发现有些包根本就找不到。于是又花了点时间去看看对应的类在哪个包,依赖是啥。
在此处给出我所使用的相关包依赖(sbt的,maven对应改一下基本上就可以了)

//hbase的几个包必须要有,否则有些关键的类用不了
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"

2.第二坑:需不需要配置Hbase参数?
因为我们有三个环境,开发环境是有Hbase的,生产没有,而最终数据又要放到第三个集群,所以Hbase 的有无对结果的影响还是比较纠结。
经过测试,如果单纯的在Spark里面生成HFile,则不需要配置额外的信息,直接默认即可,
但是如果在Spark生成HFile向直接导入Hbase则需要配置相关信息。详细的情况后续说明。

3.第三坑:HFile在代码里面load不进去?
一开始我想的是在生成HFile后直接导入Hbase(单纯想测试的),可是HFile死活load不进去,一直出现一下的信息:

18/10/13 15:12:41 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info/5eab4cb95552481faf538552f848b038 first=752-\xE6\x83\xA0\xE5\xB7\x9E last=752-\xE6\x83\xA0\xE5\xB7\x9E
18/10/13 15:12:41 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info/5b39a5e9779e499bb6817affc94c51a9 first=662-\xE9\x98\xB3\xE6\xB1\x9F last=662-\xE9\x98\xB3\xE6\xB1\x9F
18/10/13 15:12:41 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info/2e3334a0a8094b899fb6cef2195becd5 first=751-\xE9\x9F\xB6\xE5\x85\xB3 last=751-\xE9\x9F\xB6\xE5\x85\xB3
18/10/13 15:12:41 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info/e1ebb7ab430e47dfbc05938e037c00de first=759-\xE6\xB9\x9B\xE6\xB1\x9F last=759-\xE6\xB9\x9B\xE6\xB1\x9F
18/10/13 15:23:55 INFO client.RpcRetryingCaller: Call exception, tries=10, retries=35, started=673838 ms ago, cancelled=false, msg=row '' on table 'iptv:spark_test' at region=iptv:spark_test,,1539413066165.047ff19285b4360a8cf82bdade12a465., hostname=iptve2e06,60020,1539323044523, seqNum=2
18/10/13 15:25:10 INFO client.RpcRetryingCaller: Call exception, tries=11, retries=35, started=748954 ms ago, cancelled=false, msg=row '' on table 'iptv:spark_test' at region=iptv:spark_test,,1539413066165.047ff19285b4360a8cf82bdade12a465., hostname=iptve2e06,60020,1539323044523, seqNum=2

找了很久也没有找到解决方法(我猜测可能是读写权限问题,因为我读取数据是没问题的),最后只能在外面执行命令导入数据了。
虽然这个直接在代码处理是不符合我的需求的,但是这个问题没搞定很不爽(后期找到解决方案在发出来)
4.第四坑:只能处理一列数据
我的DataFrame最终的存储目标是一个列族,多个列的,但是目前在网上和官网的都是只能处理单个列族,单个列,与我自己的需求相差甚远。在网上找到一个方案可以实现一列族、多列但是需要修改源代码,这个由于时间关系就放弃。
最后采取一个折中的办法,就是每次处理一列最后合并一下,很蠢的办法,但是目前只想到这个。
在我的另一篇博文Spark:DataFrame写HFile (Hbase)一个列族、一个列扩展一个列族、多个列中实现了多列处理
5.第五坑:用错导入数据的命令
在直接在代码load数据失败后,就寻求其他的办法来load数据
在看了hbase的官方文档后,知道还可以在外部用hbase的命令将数据load进去

//在Hbase官方文档找到的命令
hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile iptv:spark_test

//在某篇博文里面找到的命令
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile iptv:spark_test

当时用了官方文档的命令 直接报找不到包,整个人瞬间不好了。
后面再去搜相关资料看到别人用的命令是下面这个,试了一下居然成功了,在Hbase也能查到对应的数据了,至此,才算是完成了整个流程测试。

实现代码

1.直接在代码里面处理load数据流程
注:这个最后一步我没有走通,问题还在找。

package com.iptv.job.basedata

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.iptv.domain.DatePattern
import com.iptv.job.JobBase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author 利伊奥克儿-lillcol
  *         2018/10/12-15:58
  *
  */
object HbaseHFileTest  {
  var hdfsPath: String = ""
  var proPath: String = ""
  var DATE: String = ""
  var conf: Configuration = null


  val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
  val sc: SparkContext = new SparkContext(sparkConf)
  val sqlContext: SQLContext = getSQLContext(sc)

  import sqlContext.implicits._

  def main(args: Array[String]): Unit = {

    val date: Date = getCommandParam(args)
    hdfsPath = args(0)
    proPath = args(1)
    DATE = dateHelper.getDateFormatStr(date, DatePattern.DATE_YMD)

    //获取测试DataFrame
    val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)
    val resultDataFrame: DataFrame = dim_sys_city_dict
      .select(concat($"city_id", lit("-"), $"city_name").as("key"), $"city_id", $"city_name")
    //hbase的表名
    val tableName = "iptv:spark_test"

    //hbase的参数配置
    conf = HBaseConfiguration.create()
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    conf.set("hbase.zookeeper.quorum", "n1,n2,n3")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("hbase.zookeeper.master", "n1:60000")
    conf.set("hbase.zookeeper.master", "n1:60000")


    lazy val job = Job.getInstance(conf)
    val con: Connection = ConnectionFactory.createConnection(conf)
    //设置MapOutput Key Value 的数据类型
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])

    //        val table: Table =new HTable(conf,tableName) //这个方法被 deprecated  采用下面的方式
    val table: Table = con.getTable(TableName.valueOf(tableName))


    val cf: Array[Byte] = "cf_info".getBytes //列族
    //对第一个列处理
    val result1: RDD[(ImmutableBytesWritable, KeyValue)] = resultDataFrame
      .map(row => {
        //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
        val rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
        val clounmVale: Array[Byte] = "city_id".getBytes //列的名称
        val value: Array[Byte] = Bytes.toBytes(row.getAs[String]("city_id")) //列的值
        val kv: KeyValue = new KeyValue(rowkey, cf, clounmVale, value) //封装一下 rowkey, cf, clounmVale, value

        (new ImmutableBytesWritable(rowkey), kv) //生成 KeyValue 这里每次只能处理一列(后续看看有没办法处理多列)
      })
    //对第而个列处理
    val result2: RDD[(ImmutableBytesWritable, KeyValue)] = resultDataFrame
      .map(row => {
        //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
        val rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
        val clounmVale: Array[Byte] = "city_name".getBytes //列的名称
        val value: Array[Byte] = Bytes.toBytes(row.getAs[String]("city_name")) //列的值
        val kv: KeyValue = new KeyValue(rowkey, cf, clounmVale, value) //封装一下 rowkey, cf, clounmVale, value

        (new ImmutableBytesWritable(rowkey), kv) //生成 KeyValue 这里每次只能处理一列(后续看看有没办法处理多列)
      })

   //保存 到hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile"
    //union两个数据
    result1
      .union(result2)
      .sortBy(x => x._1, true) //要保持 key 整理有序
      .saveAsNewAPIHadoopFile("hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile",
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      job.getConfiguration)

    //    将保存在临时文件夹的hfile数据保存到hbase中
       val load = new LoadIncrementalHFiles(conf)
       load.doBulkLoad(new Path("hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile"), table.asInstanceOf[HTable])

  }

}

2.在外面通过命令load数据

package com.iptv.job.basedata

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.iptv.domain.DatePattern
import com.iptv.job.JobBase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author 利伊奥克儿-lillcol
  *         2018/10/12-15:58
  *
  */
object HbaseHFileTest  {
  var hdfsPath: String = ""
  var proPath: String = ""
  var DATE: String = ""
  var conf: Configuration = null


  val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
  val sc: SparkContext = new SparkContext(sparkConf)
  val sqlContext: SQLContext = getSQLContext(sc)

  import sqlContext.implicits._

  def main(args: Array[String]): Unit = {


    val date: Date = getCommandParam(args)
    hdfsPath = args(0)
    proPath = args(1)
    DATE = dateHelper.getDateFormatStr(date, DatePattern.DATE_YMD)

    //获取测试DataFrame
    val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)
    val resultDataFrame: DataFrame = dim_sys_city_dict
      .select(concat($"city_id", lit("-"), $"city_name").as("key"), $"city_id", $"city_name")
    //hbase的表名
    val tableName = "iptv:spark_test"

    //hbase的参数配置
    conf = HBaseConfiguration.create()
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)



    lazy val job = Job.getInstance(conf)
    val con: Connection = ConnectionFactory.createConnection(conf)
    //设置MapOutput Key Value 的数据类型
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])

    //        val table: Table =new HTable(conf,tableName) //这个方法被 deprecated  采用下面的方式
    val table: Table = con.getTable(TableName.valueOf(tableName))


    val cf: Array[Byte] = "cf_info".getBytes //列族
    //对第一个列处理
    val result1: RDD[(ImmutableBytesWritable, KeyValue)] = resultDataFrame
      .map(row => {
        //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
        val rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
        val clounmVale: Array[Byte] = "city_id".getBytes //列的名称
        val value: Array[Byte] = Bytes.toBytes(row.getAs[String]("city_id")) //列的值
        val kv: KeyValue = new KeyValue(rowkey, cf, clounmVale, value) //封装一下 rowkey, cf, clounmVale, value

        (new ImmutableBytesWritable(rowkey), kv) //生成 KeyValue 这里每次只能处理一列(后续看看有没办法处理多列)
      })
    //对第而个列处理
    val result2: RDD[(ImmutableBytesWritable, KeyValue)] = resultDataFrame
      .map(row => {
        //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
        val rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
        val clounmVale: Array[Byte] = "city_name".getBytes //列的名称
        val value: Array[Byte] = Bytes.toBytes(row.getAs[String]("city_name")) //列的值
        val kv: KeyValue = new KeyValue(rowkey, cf, clounmVale, value) //封装一下 rowkey, cf, clounmVale, value

        (new ImmutableBytesWritable(rowkey), kv) //生成 KeyValue 这里每次只能处理一列(后续看看有没办法处理多列)
      })

   //s"保存 到hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile")
    //union两个数据
    result1
      .union(result2)
      .sortBy(x => x._1, true) //要保持 key 整理有序
      .saveAsNewAPIHadoopFile("hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile",
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      job.getConfiguration)

  }

}
//通过下面的命令将数据loadhbase
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile iptv:spark_test
//生成的HFile
[hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile
0       0       hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/_SUCCESS
20.3 K  40.6 K  hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info

//数据load 进hbase后
[hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile
0  0  hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/_SUCCESS
0  0  hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile/cf_info

//在hbase下的文件结构
[hdfs@iptve2e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/hbase/data/iptv/spark_test
290     580     hdfs://ns1/hbase/data/iptv/spark_test/.tabledesc
0       0       hdfs://ns1/hbase/data/iptv/spark_test/.tmp
32.4 K  64.8 K  hdfs://ns1/hbase/data/iptv/spark_test/047ff19285b4360a8cf82bdade12a465

//Hbase 数据情况
hbase(main):005:0> scan 'iptv:spark_test' ,{LIMIT=>5}
ROW                                                          COLUMN+CELL                                                                                                                                                                     
 200-\xE5\xB9\xBF\xE5\xB7\x9E                                column=cf_info:city_id, timestamp=1539496131471, value=200                                                                                                                      
 200-\xE5\xB9\xBF\xE5\xB7\x9E                                column=cf_info:city_name, timestamp=1539496131471, value=\xE5\xB9\xBF\xE5\xB7\x9E                                                                                               
 660-\xE6\xB1\x95\xE5\xB0\xBE                                column=cf_info:city_id, timestamp=1539496131471, value=660                                                                                                                      
 660-\xE6\xB1\x95\xE5\xB0\xBE                                column=cf_info:city_name, timestamp=1539496131471, value=\xE6\xB1\x95\xE5\xB0\xBE                                                                                               
 662-\xE9\x98\xB3\xE6\xB1\x9F                                column=cf_info:city_id, timestamp=1539496131471, value=662                                                                                                                      
 662-\xE9\x98\xB3\xE6\xB1\x9F                                column=cf_info:city_name, timestamp=1539496131471, value=\xE9\x98\xB3\xE6\xB1\x9F                                                                                               
 663-\xE6\x8F\xAD\xE9\x98\xB3                                column=cf_info:city_id, timestamp=1539496131471, value=663                                                                                                                      
 663-\xE6\x8F\xAD\xE9\x98\xB3                                column=cf_info:city_name, timestamp=1539496131471, value=\xE6\x8F\xAD\xE9\x98\xB3                                                                                               
 668-\xE8\x8C\x82\xE5\x90\x8D                                column=cf_info:city_id, timestamp=1539496131994, value=668                                                                                                                      
 668-\xE8\x8C\x82\xE5\x90\x8D                                column=cf_info:city_name, timestamp=1539496131994, value=\xE8\x8C\x82\xE5\x90\x8D                                                                                               
5 row(s) in 0.0590 seconds

hbase(main):006:0> 

至此,将数据通过HFile批量导入到Hbase的目的实现
通过生成HFilede的方式将load Hbase的方式速度是真的快,给人感觉是仅仅是移动了文件

刚接触Hbase 发现还是有点意思的

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,783评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,396评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,834评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,036评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,035评论 5 362
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,242评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,727评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,376评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,508评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,415评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,463评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,140评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,734评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,809评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,028评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,521评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,119评论 2 341

推荐阅读更多精彩内容