Kafka源码分析-Producer(3)-RecordAccumulator分析(2)

一.BufferPool:

ByteBuffer的创建和释放时比较消耗资源的,为了实现内存的高效利用,Kafka客户端使用BufferPool来实现ByteBuffer的复用。核心字段如下:

image.png

每个BufferPool对象只针对特定大小(由poolableSize字段指定)的ByteBuffer进行管理,对于其他大小的ByteBuffer并不会缓存进BufferPool。一般情况下,我们会调整MemoryRecords的大小(RecordAccumulator.batchSize字段指定),使每个MemoryRecords可以缓存多条消息。但是当一条消息的字节数大于MemoryRecords时,就不会复用BufferPool中缓存的ByteBuffer,而是额外分配ByteBuffer,在它被使用完后也不会放到BufferPool进行管理,而是让GC回收。如果经常出现这种情况就要考虑调整batchSize的配置了。
下面介绍BufferPool的关键字段:

  • free: 是一个Deque<ByteBuffer>队列,缓存了指定大小的ByteBuffer对象。
  • lock:是一个ReentrantLock,因为会有多线程并发分配和回收ByteBuffer,所以使用锁控制并发,保证线程安全。
  • waiters:记录因申请不到足够空间而阻塞的线程,此队列中实际记录的事阻塞线程对应的Condition对象。
  • totalMemory:记录因申请不到足够空间而阻塞的线程,此队列中实际记录的事阻塞线程对应的Condition对象。
  • availableMemory:记录可用空间的大小,这个空间是totalMemory-free列表中全部ByteBuffer的大小。
    BufferPool.allocate()方法负责从缓冲池中申请ByteBuffer,当缓冲池中空间不足的时候,会阻塞调用线程。分析下allocate()申请空间的过程:
 /**
     * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool
     * is configured with blocking mode.
     * 
     * @param size The buffer size to allocate in bytes
     * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available
     * @return The buffer
     * @throws InterruptedException If the thread is interrupted while blocked
     * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
     *         forever)
     */
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        this.lock.lock();//加锁同步
        try {
            // check if we have a free buffer of the right size pooled
            // 请求的是poolableSize指定大小的ByteBuffer,且free中有空闲的ByteBuffer
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();//返回合适的ByteBuffer

            // now check if the request is immediately satisfiable with the
            // memory on hand or if we need to block
            // 当申请的不是poolableSize,则执行下面的处理
            //free队列中都是poolableSize大小的ByteBuffer,可以直接计算整个free队列的空间
            int freeListSize = this.free.size() * this.poolableSize;
            if (this.availableMemory + freeListSize >= size) {
                // we have enough unallocated or pooled memory to immediately
                // satisfy the request
                //为了让availableMemory>size,freeUp()方法会从free队列中不断释放
                //ByteBuffer,直到availableMemory满足这次申请。
                freeUp(size);
                this.availableMemory -= size;//减少availableMemory
                lock.unlock();//解锁
                //这里并没有使用free队列的buffer,而是直接分配size大小的HeapByteBuffer
                return ByteBuffer.allocate(size);
            } else {
                // we are out of memory and will have to block
                //没有空间了,只能阻塞了
                int accumulated = 0;
                ByteBuffer buffer = null;
                Condition moreMemory = this.lock.newCondition();
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                this.waiters.addLast(moreMemory);//将Condition添加到waiters中
                // loop over and over until we have a buffer or have reserved
                // enough memory to allocate one
                while (accumulated < size) {//循环等待
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);//阻塞
                    } catch (InterruptedException e) {
                        //异常,移除此线程对应的Condition
                        this.waiters.remove(moreMemory);
                        throw e;
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        this.waitTime.record(timeNs, time.milliseconds());
                    }

                    if (waitingTimeElapsed) {//超时,报错
                        this.waiters.remove(moreMemory);
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    remainingTimeToBlockNs -= timeNs;
                    // check if we can satisfy this request from the free list,
                    // otherwise allocate memory
                    //请求的是poolableSize大小的ByteBuffer,且free中有空闲的ByteBuffer
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        // just grab a buffer from the free list
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {//分配一部分,并继续等待空闲空间
                        // we'll need to allocate memory, but we may only get
                        // part of what we need on this iteration
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }
                // 分配到空间了,移除condition
                // remove the condition for this thread to let the next thread
                // in line start getting memory
                Condition removed = this.waiters.removeFirst();
                if (removed != moreMemory)
                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");

                // signal any additional waiters if there is more memory left
                // over for them
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal();
                }

                // unlock and return the buffer
                lock.unlock();//解锁
                if (buffer == null)
                    return ByteBuffer.allocate(size);
                else
                    return buffer;
            }
        } finally {
            if (lock.isHeldByCurrentThread())
                lock.unlock();
        }
    }

继续分析deallocate()方法:

 /**
     * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
     * memory as free.
     * 
     * @param buffer The buffer to return
     * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity
     *             since the buffer may re-allocate itself during in-place compression
     */
    public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();//加锁同步
        try {
            //释放的ByteBuffer的大小是poolableSize,放入free队列中进行管理
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                //释放的ByteBuffer大小不是poolableSize,不会复用ByteBuffer,仅仅修改availableMemory的值
                this.availableMemory += size;
            }
            //唤醒一个因空间不足而阻塞的线程
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();//解锁
        }
    }

二.RecordAccumulator

分析完MemoryRecords,RecordBatch以及BufferPool,再来看RecordAccumulator:

主要的field分析:

  • batches:TopicPartition与RecordBatch集合的映射关系,类型是CopyOnWriteMap,是个线程安全的集合,但是其中的Deque是ArrayDeque类型,是非线程安全的集合。所以追加新消息或发送RecordBatch的时候,需要同步加锁。
    每个Deque中都保存了发往对应TopicPartition的RecordBatch集合。

  • batchSize:指定每个RecordBatch底层ByteBuffer的大小。

  • Compression:压缩类型。

  • incomplete: 未发送完成的RecordBatch集合,低层通过Set<RecordBatch>集合实现。

  • free:BufferPool对象。

  • drainIndex:使用drain方法批量导出RecordBatch时,为了防止饥饿,使用drainIndex记录上次发送停止时的位置,下次继续从此位置开始发送。
    KafkaProducer.send()方法会调用RecordAccumulator.append()方法将消息追加到RecordAccumulator中,主要逻辑如下:
    1.在batches集合中查找TopicPartition对应的Deque,查找不到,则创建新的Deque,并添加到batches集合中。
    2.用synchronized对Deque加锁。
    3.调用tryAppend()方法,尝试向Deque中最后一个RecordBatch追加Record。
    4.追加成功,则返回RecordAppendResult(封装了ProducerRequestResult)
    5.synchronized结束,解锁。
    6.追加失败,尝试从BufferPool中申请新的ByteBuffer。
    7.使用synchronized对Deque加锁。
    8.追加成功,则返回;失败,则用第6步得到的ByteBuffer创建RecordBatch。
    9.将Record追加到新建的RecordBatch中,并将新建的RecordBatch追加到Deque的尾部。
    10.将新建的RecordBatch追加到incomplete集合。
    11.synchronized解锁。
    12.返回RecordAppendResult,RecordAppendResult会作为唤醒Sender线程的条件。

 /**
     * Add a record to the accumulator, return the append result
     * <p>
     * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
     * <p>
     *
     * @param tp The topic/partition to which this record is being sent
     * @param timestamp The timestamp of the record
     * @param key The key for the record
     * @param value The value for the record
     * @param callback The user-supplied callback to execute when the request is complete
     * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
     */
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        try {
            // check if we have an in-progress batch
            //1.查找TopicPartition对应的Deque
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {//2.对Deque加锁
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //3.向Deque中最后一个RecordBatch追加Record
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null)
                    return appendResult;//4.追加成功返回
            }//5.解锁

            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            //6.追加失败,从BufferPool中申请新空间。
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //7.对Deque加锁后,再次调用tryAppend()方法尝试追加Record
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                //8.追加成功,则返回,释放步骤7申请的新空间
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                //9.在新创建的RecordBatch中追加Record,并将其添加到Batches集合中
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                //10.新创建的RecordBatch中追加到incomplete集合。
                incomplete.add(batch);
                //11.返回RecordAppendResult
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }//12.解锁
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

RecordAccumulator.tryAppend()方法会查找batches集合中对应队列的最后一个RecordBatch对象,并调用其tryAppend()完成消息追加。

  • 第2步和第7步对Deque加synchronized锁然后重试的原因?
    这里的Deque使用的是ArrayDeque(非线程安全),所以需要加锁同步。
  • 为什么分多个synchronized块而不是一个完整的synchronized块中完成呢?
    目的:提升吞吐量
    因为向BufferPool申请新ByteBuffer的时候,可能会导致阻塞。我们假设在一个synchronized块中完成所有的追加操作。假设场景:线程1发送的消息比较大,需要向BufferPool申请新的空间,而此时BufferPool空间不足,线程1在BufferPool上等待,此时线程1依然持有相应的Deque的锁;线程2发送的消息较小,Deque最后一个RecordBatch剩余空间够用,但是线程1没释放Deque的锁,线程2也要等待,如果类似线程2的线程很多,就造成很多不必要的线程阻塞,降低了吞吐量。这里体现了“减少锁的持有时间”的优化手段。
  • 为什么第二次加锁后重试,就是第7步调用了tryAppend()方法?
    目的:避免内存碎片的出现。
    如下场景(如果第二次加锁没调用tryAppend()):
    线程1发现最后一个RecordBatch空间不够用,申请空间并创建一个新的RecordBatch对象添加到Deque尾部;线程2和线程1并发执行,也将新创建一个RecordBatch对象添加到Deque尾部。根据上面逻辑知道,之后添加操作只会在Deque尾部进行,这就会是如下场景,RecordBatch4不再被使用,这就出现了内部碎片。如下图所示:


    内部碎片

回到KafkaProducer.doSend()方法,doSend()方法的最后一步是判断此次向RecordAccumulator中追加消息后是否满足唤醒Sender线程条件,这里唤醒Sender线程的条件是消息所在队列的最后一个RecordBatch满了或此队列不止一个RecordBatch。

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

在客户端将消息发送给服务端之前,会调用RecordAccumulator.ready()方法获得集群中符合发送消息条件的节点集合。这些条件是站在RecordAccumulator角度对集群中的Node进行删选的,具体条件如下:
1.Deque中有多个RecordBatch或是的第一个RecordBatch是否满了。
2.是否超时了。
3.是否有其他线程在等待BufferPool释放空间(即BufferPool的空间耗尽了)。
4.是否有线程正在等待flush操作完成。
5.Sender线程准备关闭。
看下RecordAccumulator.ready()方法,它会遍历集合中每个分区,先查找当前分区Leader副本所在的Node,如果满足上述5个条件,就把此Node信息记录到readyNodes集合中。遍历完成后返回ReadyCheckResult对象,记录了:

  • 满足发送条件的Node集合。
  • 在遍历过程中是否有找不到Leader副本的分区(可以认为Metadata中当前的元数据过时了)。
  • 下次调用ready()方法进行检查的时间间隔。
/**
     * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
     * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
     * partition batches.
     * <p>
     * A destination node is ready to send data if:
     * <ol>
     * <li>There is at least one partition that is not backing off its send
     * <li><b>and</b> those partitions are not muted (to prevent reordering if
     *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
     *   is set to one)</li>
     * <li><b>and <i>any</i></b> of the following are true</li>
     * <ul>
     *     <li>The record set is full</li>
     *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
     *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
     *     are immediately considered ready).</li>
     *     <li>The accumulator has been closed</li>
     * </ul>
     * </ol>
     */
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        //用来记录可以向哪些Node节点发送信息
        Set<Node> readyNodes = new HashSet<>();
        //记录下次需要调用ready的时间间隔
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        //根据Metadata元数据中是否有找不到Leader副本的分区
        boolean unknownLeadersExist = false;
        //是否有线程在阻塞等待BufferPool释放空间
        boolean exhausted = this.free.queued() > 0;
        //下面遍历batches集合,对其中每个分区的Leader副本所在的Node都进行判断
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();
            //查找分区的Leader副本所在的Node
            Node leader = cluster.leaderFor(part);
            //找不到Leader副本所在的Node,不能发送信息
            if (leader == null) {
                //unknownLeadersExist = true,会触发MetaData更新。
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                synchronized (deque) {//加锁读取deque中的元素。
                    //读取deque中的第一个RecordBatch
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        boolean full = deque.size() > 1 || batch.records.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        //是否是符合发送消息的节点的5个条件
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            //记录下次需要调用ready()方法检查的时间间隔
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
    }

调用RecordAccumulator.ready()方法得到readyNodes集合后,这个集合还是要经过NetworkClient的过滤之后,才能最终得到发送消息的Node集合。

然后Sender会调用RecordAccumulator.drain()方法,这个方法会根据上述的Node集合获取要发送的消息,返回Map<Integer,List<RecordBatch>>集合,key是NodeId,value是待发送的RecordBatch集合。
RecordAccumulator.drain()方法的作用是映射转换,将RecordAccumulator记录的TopicPartition->RecordBatch集合的映射,转换成了NodeId->RecordBatch集合映射。为什么要转换?
因为在网络I/O层面,生产者是面向Node节点发送消息数据,不关心这些数据属于哪个TopicPartition;但是在KafkaProducer的业务逻辑层,则是按TopicPartition生成数据,只会关心发送到哪个TopicPartition,不会关心TopicPartition在哪个Node上。Sender线程每次向每个Node节点最多发一个ClientRequest请求,其中封装了追加到此Node节点上多个分区信息,请求到服务端后,由Kafka服务端进行解析。

/**
     * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
     * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
     * 
     * @param cluster The current cluster metadata
     * @param nodes The list of node to drain
     * @param maxSize The maximum number of bytes to drain
     * @param now The current unix time in milliseconds
     * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.
     */
    public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
                                                 Set<Node> nodes,
                                                 int maxSize,
                                                 long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();
        //转换后的结果
        Map<Integer, List<RecordBatch>> batches = new HashMap<>();
        for (Node node : nodes) {//遍历指定的ready Node集合
            int size = 0;
            //获取当前Node上的分区集合
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());

            // 记录要发送的RecordBatch
            List<RecordBatch> ready = new ArrayList<>();

            /*
            drainIndex是batches的下标,记录上次发送停止的位置,下次继续从这个位置发送
            如果一直从索引0的队列开始发送,可能会出现一直只发送前几个分区的消息的情况,造成其它分区饥饿。
            to make starvation less likely this loop doesn't start at 0
            */
            int start = drainIndex = drainIndex % parts.size();
            do {
                //获取分区信息
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                // Only proceed if the partition has no in-flight batches.
                if (!muted.contains(tp)) {
                    //找到tp对应的deque
                    Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                    if (deque != null) {
                        synchronized (deque) {
                            RecordBatch first = deque.peekFirst();//获取第一个RecordBatch。
                            if (first != null) {
                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                                // Only drain the batch if it is not during backoff period.
                                if (!backoff) {
                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                        // there is a rare case that a single batch size is larger than the request size due
                                        // to compression; in this case we will still eventually send this batch in a single
                                        // request
                                        break;
                                    } else {
                                        //从队列中获取一个 RecordBatch,并将这个RecordBatch放到ready集合中。
                                        //每个deque只取一个RecordBatch。
                                        RecordBatch batch = deque.pollFirst();
                                        batch.records.close();//关闭Compressor及底层输出流,并将MemoryRecords设置为只读。
                                        size += batch.records.sizeInBytes();
                                        ready.add(batch);
                                        batch.drainedMs = now;
                                    }
                                }
                            }
                        }
                    }
                }
                //增加drainIndex
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != drainIndex);
            batches.put(node.id(), ready);//记录NodeId与RecordBatch的对应关系
        }
        return batches;
    }

通过drainIndex的累加保证防止饥饿的发生。
这样整个KafkaProducer.send()方法过程中用到的所有组件介绍完毕,下面几节介绍Sender线程是如何发送消息的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容