pyspark与机器学习

借助于spark的分布式特性,机器学习与spark的结合可以解决数据规模大、复杂运算时间久的问题。
spark提供MLlib组件用于满足机器学习的需求。
本文将从机器学习数据读取、数据操作、特征处理、模型训练、结果评估、模型保存六个方面展开。

一、基础操作

1、sparksession

(1) 创建SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能[1]
任何Spark程序的第一步都是先创建SparkSession。

From pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing').getOrCreate()
(2) SparkSession与SparkContext[1]

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context(例如对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext)。
但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext

至于图中的RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

这些概念在编码过程中偶尔会出现,对于spark分布式运行的详细架构本文就不深入了,只需对SparkSession、SparkContext有个大致的了解。

2、数据加载

(1) 本地数据读取

使用spark方式读取本地csv。

df = spark.read.csv('XXX.csv',inferSchema=True,header=True)

使用pandas方式读取本地csv,转换pandas dataframe为spark dataframe。

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
pandas_df = pd.read_csv('/home/logsaas/pyspark/lalafile/movie_ratings_df.csv')
sc = SparkContext()
sqlContest = SQLContext(sc)
df = sqlContest.createDataFrame(pandas_df)
(2) 数据库读取
# hive数据库读取
spark.sql('select * from XX')
(3) Spark_DataFrame与Pandas_DataFrame区别[2]

Spark中DataFrame与Pandas中DataFrame的主要区别是工作方式的不同。

区别 pandas spark
工作方式 单机模式,没有并行机制,不支持Hadoop,处理大量数据有瓶颈 分布式并行计算框架,所有的数据和操作自动并行分布在各个集群结点上,支持Hadoop,能处理大量数据
DataFrame可变性 可变 Spark中RDDs是不可变的,因此DataFrame也是不可变的
相互转换 从spark_df转换:pandas_df = spark_df.toPandas() 从pandas_df转换:spark_df = SQLContext.createDataFrame(pandas_df)

3、数据操作

由于spark dataframe和python dataframe的区别,随之而来的操作差别也比较大。

操作 pandas spark
取列信息 df[“name”] df.select(“name”)
取满足条件的信息 df[df[‘age’]>21] df.filter(df[‘age’]>21)
groupby df.groupby(“A”).avg(“B”) from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show()
data合并 append/concat等 df.join()
去重统计 len(set(df['title'])) df.select('title').distinct().count()
新增一列 df['newcol']=lists df.withColumn('newcol',lists)
其他 ··· ···

常见的spark dataframe操作如下:

# **基础描述**
spark_df.count()  # 行数
spark_df.columns  # 列名称
spark_df.printSchema()  # 结构及属性显示
spark_df.show(5,False)  # truncate=False表示不压缩显示
spark_df.describe().show()  # 均值/最值等描述

# **dataframe操作**
# 取'age','mobile'两列
spark_df.select('age','mobile').show(5) 
# 新增一列:age_after_10_yrs
spark_df.withColumn("age_after_10_yrs",(spark_df["age"]+10)).show(10,False)
# 新建一列age_double,将age转换为double属性
spark_df.withColumn('age_double',spark_df['age'].cast(DoubleType())).show(10,False)
# 筛选mobile==Vivo的信息
spark_df.filter(spark_df['mobile']=='Vivo').show()
spark_df.filter(spark_df['mobile']=='Vivo').select('age','ratings','mobile').show()
spark_df.filter(spark_df['mobile']=='Vivo').filter(spark_df['experience'] >10).show()
# 去重统计
spark_df.select('mobile').distinct().show() 
# 行去重
spark_df=spark_df.dropDuplicates()
# 删除列
df_new=spark_df.drop('mobile')
# groupby操作
spark_df.groupBy('mobile').count().show(5,False)
spark_df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
spark_df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)  # 根据mobile分区,计算experience的sum
# udf 自建sql函数
from pyspark.sql.functions import udf
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'
brand_udf=udf(price_range,StringType())  # create udf using python function # 输出为string格式
# 新建一列price_range
spark_df.withColumn('price_range',brand_udf(spark_df['mobile'])).show(10,False)
# 使用lamba创建udf
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())  # using lambda function
# 新建一列age_group
spark_df.withColumn("age_group", age_udf(spark_df.age)).show(10,False)

在spark中很少会用for循环去处理一个个特征,一般使用函数/自建UDF,批量处理掉了。

比如计算Review列每个数据的长度。
python模式
review_length = []
for info in text_df['Review']:
···· review_length.apend(length(info))
text_df['length'] = review_length
pyspark模式
from pyspark.sql.functions import length text_df=text_df.withColumn('length',length(text_df['Review']))

3、特征处理

以下为pyspark.ml.feature提供的特征处理功能,满足了大部分机器学习的特征处理需求。

标准化与归一化

函数 备注
NGram 正则标准化,不需要fit 直接transform
Normalizer 归一化函数,使它的范数或者数值范围在一定的范围内
MaxAbsScaler 归一化函数,将列标准化到[0,1]之间,每一个值都除以本列的绝对值最大的数,先fit然后 transform()
MinMaxScaler 最大最小归一化,先fit然后 transform()
StandardScaler 对列进行标准化,先fit再transform

分箱处理

函数 备注
Binarizer 将数值型特征的二值化。将数据框中的某一列按照阈值划分为只包含0,1的列
Bucketizer 将连续特征按照splits值进行分箱
QuantileDiscretizer 将连续列进行分箱操作,numBuckets 表示分箱数目

文本特征处理

函数 备注
CountVectorizer 只考虑词汇在文本中出现的频率
HashingTF 自然语言处理的场景中,hashingTF使用的比较多
IDF TF-IDF 作为特征提取
StringIndexer 针对单个类别型特征进行转换。把字符串的列按照出现频率进行排序,出现次数最高的对应的Index为0,依次下排
IndexToString 索引转字符串
Tokenizer 将字符串列转换成小写并按空格切分
RegexTokenizer 基于正则的方式进行文档切分成单词组
Word2Vec 将words转换成一个vectorSize维的向量
OneHotEncoderEstimator 独热编码相关
RFormula 文本类特征处理,先 StringIndexer 再 OneHotEncoderEstimator

特征操作

函数 备注
Imputer 缺失值填补,默认使用均值或中值(“median”)填补,要计算均值所以要先 fit(),然后再transfrom()
StopWordsRemover 英文停用词移除
SQLTransformer 使用SQL语句创建新的列,直接transform()
BucketedRandomProjectionLSH 基于欧几里德距离的空间度量
MinHashLSH 基于Jaccard距离的空间度量
ElementwiseProduct 计算inputCol与scaling内积,不需要训练,直接transform
PolynomialExpansion 特征变换,将特征拓展比如[x,y],如果degree=2,则拓展成[x,xy,y,xx,y*y],所以直接transform即可
OneHotEncoder 独热编码,特征用一个二进制数字来表示。例如[0, 0, 3]处理后为[ 1., 0., 0., 1., 0., 0., 0., 0., 1.]
ChiSqSelector 依据卡方检验,计算类别特征与分类标签的关联性。该函数只有先训练才能知道挑选哪些特征值,所以要先fit,应用的时候再transform

vector 组合

函数 备注
VectorIndexer 数据集中的类别特征转换。可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引
VectorSizeHint 允许用户显式指定列的向量大小
VectorAssembler 用于将多个列合并为一个向量列,直接transform即可,经常用的
VectorSlicer 通过对这些索引的值进行筛选得到新的向量集

压缩降维

函数 备注
PCA 对特征进行PCA降维,先fit然后 transform()
DCT 离散余弦变换(Discrete Cosine Transform),用于将数据或图像的压缩
FeatureHasher 特征哈希,相当于一种降维技巧
# 正则标准化,不需要fit 直接transform
ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
ngram.transform(df)

# 归一化函数,使它的范数或者数值范围在一定的范围内
normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
normalizer.transform(df).head().features

# 归一化函数,将列标准化到[0,1]之间
maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
model = maScaler.fit(df)
model.transform(df).show()

# 最大最小归一化,先fit然后 transform()
mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
model = mmScaler.fit(df)
model.transform(df)

# 对列进行标准化,先fit再transform
standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
model = standardScaler.fit(df)
model.transform(df)

# 按阈值1划分values,结果输出到features,结果为0/1
binarizer = Binarizer(threshold=1.0, inputCol="values", outputCol="features")
new_df = binarizer.transform(df)

# 将连续特征按照splits值进行分箱
bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")],inputCol="values", outputCol="buckets")
bucketed = bucketizer.setHandleInvalid("keep").transform(df)

# 将连续列进行分箱操作,numBuckets 表示分箱数目
qds = QuantileDiscretizer(numBuckets=2,inputCol="values", outputCol="buckets", relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
qds.setHandleInvalid("keep").fit(df).transform(df)

# 只考虑词汇在文本中出现的频率
cv = CountVectorizer(inputCol="raw", outputCol="vectors")
model = cv.fit(df)  # model.vocabulary
new_df = model.transform(df)

# 词频统计
hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
hashingTF.transform(df)

# 文本tf-idf计算
idf = IDF(minDocFreq=3, inputCol="tf", outputCol="idf")
model = idf.fit(df)  # model.idf
model.transform(df)

# 针对单个类别型特征进行转换,把字符串的列按照出现频率进行排序
stringIndexer = StringIndexer(inputCol="label",outputCol="indexed", handleInvalid="error",stringOrderType="frequencyDesc")
model = stringIndexer.fit(stringIndDf)
td = model.transform(stringIndDf)

# 将字符串列转换成小写并按空格切分
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df)

# 基于正则的方式进行文档切分成单词组
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
reTokenizer.transform(df)

# Word2Vec
word2Vec = Word2Vec(vectorSize=5, seed=42,inputCol="sentence", outputCol="model")
model = word2Vec.fit(doc)
model.transform(doc)

# OneHotEncoderEstimator
ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
model = ohe.fit(df)
model.transform(df)

# RFormula 文本类特征处理
rf = RFormula(formula="y ~ x + s")
model = rf.fit(df)
model.transform(df)

# 缺失值填补,要计算均值所以要先 fit(),然后再transfrom()
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()

# 移除停顿词
remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
remover.transform(df)

# 使用SQL语句创建新的列
sqlTrans = SQLTransformer(statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df)

# 欧几里德距离的空间度量
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=1.0)
model = brp.fit(df)
new_df = model.transform(df)

# 基于Jaccard距离的空间度量
mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
model = mh.fit(df)
model.transform(df)

# 计算inputCol与scaling内积,不需要训练,直接transform
ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]), inputCol="values", outputCol="eprod")
new_df = ep.transform(df)

# PolynomialExpansion特征变换
px = PolynomialExpansion(degree=2, inputCol="dense", outputCol="expanded")
px.transform(df)

# 独热编码
encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
encoder.transform(td)

# 依据卡方检验特征处理
selector = ChiSqSelector(numTopFeatures=1,outputCol="selectedFeatures")
model = selector.fit(df)
new_df = model.transform(df)

# 数据集中的类别特征转换
indexer = VectorIndexer(maxCategories=2, inputCol="a", outputCol="indexed")
model = indexer.fit(df)
model.transform(df)

# 允许用户显式指定列的向量大小
sizeHint = VectorSizeHint(inputCol="vector", size=3, handleInvalid="skip")  # 先指定大小
vecAssembler = VectorAssembler(inputCols=["vector", "float"], outputCol="assembled")  # 整合成一列
pipeline = Pipeline(stages=[sizeHint, vecAssembler])
pipelineModel = pipeline.fit(df)  # 功能模块合并
pipelineModel.transform(df)

# 用于将多个列合并为一个向量列,直接transform即可,经常用的
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df)

# 通过对这些索引的值进行筛选得到新的向量集
vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
vs.transform(df).head().sliced

# PCA
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(df)
model.transform(df)

# 离散余弦变换压缩数据
dct = DCT(inverse=False, inputCol="vec", outputCol="resultVec")
df2 = dct.transform(df1)

# 特征哈希,相当于一种降维技巧
hasher = FeatureHasher(inputCols=cols, outputCol="features")
hasher.transform(df).head().features

4、模型训练

分类模块

model 备注
LinearSVC 线性分类支持向量机
LogisticRegression 逻辑回归
DecisionTreeClassifier 决策树分类
GBTClassifier GBDT梯度提升决策树
RandomForestClassifier 随机森林
NaiveBayes 朴素贝叶斯
MultilayerPerceptronClassifier 多层感知机分类器
OneVsRest 将多分类问题简化为二分类问题
# 线性分类支持向量机 
svm = LinearSVC(maxIter=5, regParam=0.01)  # maxIter最大迭代次数5次,regParam正则化参数
model = svm.fit(df)
# 逻辑回归
blor = LogisticRegression(regParam=0.01, weightCol="weight") 
blorModel = blor.fit(bdf)
# 决策树分类
dt = DecisionTreeClassifier(maxDepth=2, labelCol="indexed")  # 限定决策树的最大可能深度为2
model = dt.fit(td)
# GBDT梯度提升决策树
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
model = gbt.fit(td)
# 随机森林
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
model = rf.fit(td)
# 朴素贝叶斯
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
model = nb.fit(df)
# 多层感知机
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[2, 2, 2], blockSize=1, seed=123)
model = mlp.fit(df)
# OneVsRest
lr = LogisticRegression(regParam=0.01)
ovr = OneVsRest(classifier=lr)
model = ovr.fit(df)

聚类模块

model 备注
BisectingKMeans 二分类KMeans
KMeans k均值聚类算法
GaussianMixture 高斯混合模型
LDA LDA主题聚类
PowerIterationClustering 幂迭代聚类
# 二分类KMeans
bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0)
model = bkm.fit(df)
transformed = model.transform(df)
# k均值聚类算法
kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(df)
transformed = model.transform(df)
# 高斯混合模型
gm = GaussianMixture(k=3, tol=0.0001,maxIter=10, seed=10)
model = gm.fit(df)
transformed = model.transform(df)
# LDA主题聚类
lda = LDA(k=2, seed=1, optimizer="em")
model = lda.fit(df)
# 幂迭代聚类
pic = PowerIterationClustering(k=2, maxIter=40,weightCol="weight")
assignments = pic.assignClusters(df)

回归模块

model 备注
AFTSurvivalRegression 生存分析的对数线性模型
DecisionTreeRegressor 决策树回归模型
GBTRegressor 全称梯度下降树回归模型
IsotonicRegression 保序回归
# 生存分析的对数线性模型
aftsr = AFTSurvivalRegression()
model = aftsr.fit(df)
# 决策树回归模型
dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
model = dt.fit(df)
# 全称梯度下降树回归模型
gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
model = gbt.fit(df)
# 保序回归
ir = IsotonicRegression()
model = ir.fit(df)

推荐模块

model 备注
ALS (Alternatingleastsquares。交替最小二乘法
# 推荐系统
als = ALS(rank=10, maxIter=5, seed=0)
model = als.fit(df)

5、结果评估

model 备注
BinaryClassificationEvaluator 二分类评估
RegressionEvaluator 回归评估
MulticlassClassificationEvaluator 多分类评估
ClusteringEvaluator 聚类评估
evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw")
evaluator.evaluate(dataset)

evaluator = RegressionEvaluator(predictionCol="raw")
evaluator.evaluate(dataset)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(dataset)

evaluator = ClusteringEvaluator(predictionCol="prediction")
evaluator.evaluate(dataset)

6、模型保存

直接使用save保存模型,使用load加载训练结果。

# rf_classifier为RandomForestClassificationModel训练完的模型
rf_classifier.save("xxx/RF_model")

# 模型调用
rf=RandomForestClassificationModel.load("xxx/RF_model")
model_preditions=rf.transform(test_df)

二、运行案例

对上面的各个过程的方法进行组装,以随机森林代码为例:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler  # 特征处理
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassificationModel


spark=SparkSession.builder.appName('random_forest').getOrCreate()


# 数据读入
pandas_df = pd.read_csv('xxx/affairs.csv')
sc = SparkContext()  # 初始化;SparkContext则是客户端的核心
sqlContest = SQLContext(sc)  # SQLContext是Spark SQL进行结构化数据处理的入口
df = sqlContest.createDataFrame(pandas_df)  # pandas df 转换为 spark df 格式


# 数据属性
print((df.count(),len(df.columns)))
df.printSchema()
df.describe().select('summary','rate_marriage','age','yrs_married','children','religious').show()
df.groupBy('affairs').count().show()
df.groupBy('rate_marriage').count().show()
df.groupBy('rate_marriage','affairs').count().orderBy('rate_marriage','affairs','count',ascending=True).show()
df.groupBy('religious','affairs').count().orderBy('religious','affairs','count',ascending=True).show()
df.groupBy('children','affairs').count().orderBy('children','affairs','count',ascending=True).show()
df.groupBy('affairs').mean().show()


# 特征处理
df_assembler = VectorAssembler(inputCols=['rate_marriage', 'age', 'yrs_married', 'children', 'religious'], outputCol="features")  # 把特征组装成一个list
df = df_assembler.transform(df)
df.printSchema()
df.show(5,truncate=False)


# 数据集划分
model_df=df.select(['features','affairs'])
train_df,test_df=model_df.randomSplit([0.75,0.25])
train_df.count()
train_df.groupBy('affairs').count().show()
test_df.groupBy('affairs').count().show()


# 模型构建
rf_classifier=RandomForestClassifier(labelCol='affairs',numTrees=50).fit(train_df)
rf_predictions=rf_classifier.transform(test_df)
rf_predictions.show()
# 结果查看
rf_classifier.featureImportances  # 各个特征的权重


# 模型效果
rf_predictions.groupBy('prediction').count().show()
rf_predictions.select(['probability','affairs','prediction']).show(10,False)
# 多分类模型——准确率
rf_accuracy=MulticlassClassificationEvaluator(labelCol='affairs',metricName='accuracy').evaluate(rf_predictions)
print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))
print(rf_accuracy)
# 多分类模型——精确率
rf_precision=MulticlassClassificationEvaluator(labelCol='affairs',metricName='weightedPrecision').evaluate(rf_predictions)
print('The precision rate on test data is {0:.0%}'.format(rf_precision))
# AUC
rf_auc=BinaryClassificationEvaluator(labelCol='affairs').evaluate(rf_predictions)
print(rf_auc)


# 模型保存
rf_classifier.save("/home/logsaas/pyspark/lalafile/RF_model")

# 模型调用
rf=RandomForestClassificationModel.load("/home/logsaas/pyspark/lalafile/RF_model")
model_preditions=rf.transform(test_df)
model_preditions.show()

参考资料

[1] SparkSession与SparkContext:https://blog.csdn.net/qq_35495339/article/details/98119422
[2] Spark中DataFrame与Pandas中DataFrame的区别:https://blog.csdn.net/u013129944/article/details/80019546
[3] pyspark 官网解释:http://spark.apache.org/docs/latest/api/python/pyspark.ml.html
[4] pyspark.ml.feature函数中文简介:https://blog.csdn.net/yw_vine/article/details/80117759
[5] Spark特征工程:https://blog.csdn.net/u012050154/article/details/60766387
[6] http://dblab.xmu.edu.cn/blog/1709-2/ 子雨
[7] Machine Learning with PySpark by Pramod Singh
[8] https://github.com/Apress/machine-learning-with-pyspark

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容