以太坊源码深入分析(5)-- Ethereum服务和以太坊P2P协议发送广播源码分析

在“以太坊源码深入分析(2)”一文中,我们提到Ethereum作为一个service,被Node 注册进去。Node start的时候会启动其注册的所有服务,Ethereum service也是一样。

一,ethereum service的初始化和启动
初始化方法

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    if config.SyncMode == downloader.LightSync {
        return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
    }
    if !config.SyncMode.IsValid() {
        return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
    }
    chainDb, err := CreateDB(ctx, config, "chaindata")
    if err != nil {
        return nil, err
    }
    stopDbUpgrade := upgradeDeduplicateData(chainDb)
    chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
    if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
        return nil, genesisErr
    }
    log.Info("Initialised chain configuration", "config", chainConfig)

    eth := &Ethereum{
        config:         config,
        chainDb:        chainDb,
        chainConfig:    chainConfig,
        eventMux:       ctx.EventMux,
        accountManager: ctx.AccountManager,
        engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
        shutdownChan:   make(chan bool),
        stopDbUpgrade:  stopDbUpgrade,
        networkId:      config.NetworkId,
        gasPrice:       config.GasPrice,
        etherbase:      config.Etherbase,
        bloomRequests:  make(chan chan *bloombits.Retrieval),
        bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),
    }

    log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)

    if !config.SkipBcVersionCheck {
        bcVersion := core.GetBlockChainVersion(chainDb)
        if bcVersion != core.BlockChainVersion && bcVersion != 0 {
            return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
        }
        core.WriteBlockChainVersion(chainDb, core.BlockChainVersion)
    }
    var (
        vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
        cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
    )
    eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
    if err != nil {
        return nil, err
    }
    // Rewind the chain in case of an incompatible config upgrade.
    if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
        log.Warn("Rewinding chain to upgrade configuration", "err", compat)
        eth.blockchain.SetHead(compat.RewindTo)
        core.WriteChainConfig(chainDb, genesisHash, chainConfig)
    }
    eth.bloomIndexer.Start(eth.blockchain)

    if config.TxPool.Journal != "" {
        config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
    }
    eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

    if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
        return nil, err
    }
    eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
    eth.miner.SetExtra(makeExtraData(config.ExtraData))

    eth.ApiBackend = &EthApiBackend{eth, nil}
    gpoParams := config.GPO
    if gpoParams.Default == nil {
        gpoParams.Default = config.GasPrice
    }
    eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams)

    return eth, nil
}

1,如果config.SyncMode 是 downloader.LightSync,走的是les/backend.go的初始化方法。
2,chainDb, err := CreateDB(ctx, config, "chaindata")打开leveldb,leveldb是eth存储数据库。
3,stopDbUpgrade := upgradeDeduplicateData(chainDb) 检查chainDb版本,如果需要的话,启动后台进程进行升级。
4,chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)装载创世区块。 根据节点条件判断是从数据库里面读取,还是从默认配置文件读取,还是从自定义配置文件读取,或者是从代码里面获取默认值。并返回区块链的config和创世块的hash。
5,装载Etherum struct的各个成员。eventMux和accountManager 是Node 启动 eth service的时候传入的。eventMux可以认为是一个全局的事件多路复用器,accountManager认为是一个全局的账户管理器。engine创建共识引擎。etherbase 配置此Etherum的主账号地址。初始化bloomRequests 通道和bloom过滤器。
6,判断客户端版本号和数据库版本号是否一致
7,eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) 初始化eth的blockchain,也就是eth的区块链
8,eth.blockchain.SetHead(compat.RewindTo) 根据创始区块设置区块头
9,eth.bloomIndexer.Start(eth.blockchain)启动bloomIndexer
10,eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) 初始化eth 区块链的交易池,存储本地生产的和P2P网络同步过来的交易。
11,eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb)初始化以太坊协议管理器,用于区块链P2P通讯
12, miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) 初始化矿工
13,eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) 创建预言最新gasprice的预言机。

ethereum service的初始化配置了不少东西,基本上涉及到了以太坊区块链系统的所有内容,后续一一分解各个模块。

启动方法

func (s *Ethereum) Start(srvr *p2p.Server) error {
    // Start the bloom bits servicing goroutines
    s.startBloomHandlers()

    // Start the RPC service
    s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())

    // Figure out a max peers count based on the server limits
    maxPeers := srvr.MaxPeers
    if s.config.LightServ > 0 {
        if s.config.LightPeers >= srvr.MaxPeers {
            return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
        }
        maxPeers -= s.config.LightPeers
    }
    // Start the networking layer and the light server if requested
    s.protocolManager.Start(maxPeers)
    if s.lesServer != nil {
        s.lesServer.Start(srvr)
    }
    return nil
}

首先启动bloom过滤器
eth 的net 相关Api 加入RPC 服务。
s.protocolManager.Start(maxPeers) 设置最大同步节点数,并启动eth P2P通讯。
如果ethereum service 出问题了才会启动lesServer。

二,ProtocolManager 以太坊P2P通讯协议管理
首先分析一下同在eth目录下的eth/handler.go。
ProtocolManager 的初始化方法

func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
    // Create the protocol manager with the base fields
    manager := &ProtocolManager{
        networkId:   networkId,
        eventMux:    mux,
        txpool:      txpool,
        blockchain:  blockchain,
        chainconfig: config,
        peers:       newPeerSet(),
        newPeerCh:   make(chan *peer),
        noMorePeers: make(chan struct{}),
        txsyncCh:    make(chan *txsync),
        quitSync:    make(chan struct{}),
    }
    // Figure out whether to allow fast sync or not
    if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
        log.Warn("Blockchain not empty, fast sync disabled")
        mode = downloader.FullSync
    }
    if mode == downloader.FastSync {
        manager.fastSync = uint32(1)
    }
    // Initiate a sub-protocol for every implemented version we can handle
    manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
    for i, version := range ProtocolVersions {
        // Skip protocol version if incompatible with the mode of operation
        if mode == downloader.FastSync && version < eth63 {
            continue
        }
        // Compatible; initialise the sub-protocol
        version := version // Closure for the run
        manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
            Name:    ProtocolName,
            Version: version,
            Length:  ProtocolLengths[i],
            Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
                peer := manager.newPeer(int(version), p, rw)
                select {
                case manager.newPeerCh <- peer:
                    manager.wg.Add(1)
                    defer manager.wg.Done()
                    return manager.handle(peer)
                case <-manager.quitSync:
                    return p2p.DiscQuitting
                }
            },
            NodeInfo: func() interface{} {
                return manager.NodeInfo()
            },
            PeerInfo: func(id discover.NodeID) interface{} {
                if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
                    return p.Info()
                }
                return nil
            },
        })
    }
    if len(manager.SubProtocols) == 0 {
        return nil, errIncompatibleConfig
    }
    // Construct the different synchronisation mechanisms
    manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)

    validator := func(header *types.Header) error {
        return engine.VerifyHeader(blockchain, header, true)
    }
    heighter := func() uint64 {
        return blockchain.CurrentBlock().NumberU64()
    }
    inserter := func(blocks types.Blocks) (int, error) {
        // If fast sync is running, deny importing weird blocks
        if atomic.LoadUint32(&manager.fastSync) == 1 {
            log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
            return 0, nil
        }
        atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
        return manager.blockchain.InsertChain(blocks)
    }
    manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)

    return manager, nil
}

1,peers 为以太坊临近的同步网络节点,newPeerCh、noMorePeers、txsyncCh、quitSync对应同步的通知
2,manager.SubProtocols 创建以太坊 P2P server 的 通讯协议,通常只有一个值。manager.SubProtocols,在Node start的时候传给以太坊 P2P server并同时start P2P server。协议里面三个函数指针(Run、NodeInfo、PeerInfo)非常重要,后面会用到。
3,manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
创建了一个下载器,从远程网络节点中获取hashes和blocks。
4,manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。初始化传入的几个参数 都是用于处理同步区块链数据的函数指针

Ethereum service 启动的时候会同时启动ProtocolManager,ProtocolManager的start()方法:

func (pm *ProtocolManager) Start(maxPeers int) {
    pm.maxPeers = maxPeers

    // broadcast transactions
    pm.txCh = make(chan core.TxPreEvent, txChanSize)
    pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
    go pm.txBroadcastLoop()

    // broadcast mined blocks
    pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
    go pm.minedBroadcastLoop()

    // start sync handlers
    go pm.syncer()
    go pm.txsyncLoop()
}

1,创建一个新交易的订阅通道,并启动交易广播的goroutine
2,创建一个挖坑的订阅通道,并启动挖坑广播的goroutine
注:同为订阅通道为什么pm.txSub和pm.minedBlockSub的实现不一样?深入代码会发现pm.txSub用的是event/feed通知方式,pm.minedBlockSub用的是event/TypeMuxEvent通知方式,event/TypeMuxEvent方式将要被Deprecated。
3,pm.syncer() 启动同步goroutine,定时的和网络其他节点同步,并处理网络节点的相关通知
4,pm.txsyncLoop() 启动交易同步goroutine,把新的交易均匀的同步给网路节点

三,ProtocolManager主动向网络节点广播
ProtocolManager Start()方法里面的4个goroutine都是处理ProtocolManager向以太坊网络节点进行广播的。

1,pm.txBroadcastLoop()方法

func (self *ProtocolManager) txBroadcastLoop() {
    for {
        select {
        case event := <-self.txCh:
            self.BroadcastTx(event.Tx.Hash(), event.Tx)

        // Err() channel will be closed when unsubscribing.
        case <-self.txSub.Err():
            return
        }
    }
}

core/tx_pool.go 产生新的交易的时候会send self.txCh,这时候会激活
self.BroadcastTx(event.Tx.Hash(), event.Tx)

func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
    // Broadcast transaction to a batch of peers not knowing about it
    peers := pm.peers.PeersWithoutTx(hash)
    //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
    for _, peer := range peers {
        peer.SendTransactions(types.Transactions{tx})
    }
    log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}

向缓存的没有这个交易hash的网络节点广播此次交易。

2,pm.minedBroadcastLoop()方法

// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {
    // automatically stops if unsubscribe
    for obj := range self.minedBlockSub.Chan() {
        switch ev := obj.Data.(type) {
        case core.NewMinedBlockEvent:
            self.BroadcastBlock(ev.Block, true)  // First propagate block to peers
            self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
        }
    }
}

收到miner.go 里面NewMinedBlockEvent 挖到新区块的事件通知,激活self.BroadcastBlock(ev.Block, true)

func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
    hash := block.Hash()
    peers := pm.peers.PeersWithoutBlock(hash)

    // If propagation is requested, send to a subset of the peer
    if propagate {
        // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
        var td *big.Int
        if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
            td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
        } else {
            log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
            return
        }
        // Send the block to a subset of our peers
        transfer := peers[:int(math.Sqrt(float64(len(peers))))]
        for _, peer := range transfer {
            peer.SendNewBlock(block, td)
        }
        log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
        return
    }
    // Otherwise if the block is indeed in out own chain, announce it
    if pm.blockchain.HasBlock(hash, block.NumberU64()) {
        for _, peer := range peers {
            peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
        }
        log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
    }
}

如果propagate为true 向网络节点广播整个挖到的block,为false 只广播挖到的区块的hash值和number值。广播的区块还包括这个区块打包的所有交易。

3,pm.syncer() 方法

func (pm *ProtocolManager) syncer() {
    // Start and ensure cleanup of sync mechanisms
    pm.fetcher.Start()
    defer pm.fetcher.Stop()
    defer pm.downloader.Terminate()

    // Wait for different events to fire synchronisation operations
    forceSync := time.NewTicker(forceSyncCycle)
    defer forceSync.Stop()

    for {
        select {
        case <-pm.newPeerCh:
            // Make sure we have peers to select from, then sync
            if pm.peers.Len() < minDesiredPeerCount {
                break
            }
            go pm.synchronise(pm.peers.BestPeer())

        case <-forceSync.C:
            // Force a sync even if not enough peers are present
            go pm.synchronise(pm.peers.BestPeer())

        case <-pm.noMorePeers:
            return
        }
    }
}

pm.fetcher.Start()启动 fetcher,辅助同步区块数据

当P2P server执行 ProtocolManager 的p2p.Protocol 的Run指针的时候会send pm.newPeerCh,这时候选择最优的网络节点(TD 总难度最大的)启动pm.synchronise(pm.peers.BestPeer()) goroutine。

// synchronise tries to sync up our local block chain with a remote peer.
func (pm *ProtocolManager) synchronise(peer *peer) {
    // Short circuit if no peers are available
    if peer == nil {
        return
    }
    // Make sure the peer's TD is higher than our own
    currentBlock := pm.blockchain.CurrentBlock()
    td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())

    pHead, pTd := peer.Head()
    if pTd.Cmp(td) <= 0 {
        return
    }
    // Otherwise try to sync with the downloader
    mode := downloader.FullSync
    if atomic.LoadUint32(&pm.fastSync) == 1 {
        // Fast sync was explicitly requested, and explicitly granted
        mode = downloader.FastSync
    } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
        // The database seems empty as the current block is the genesis. Yet the fast
        // block is ahead, so fast sync was enabled for this node at a certain point.
        // The only scenario where this can happen is if the user manually (or via a
        // bad block) rolled back a fast sync node below the sync point. In this case
        // however it's safe to reenable fast sync.
        atomic.StoreUint32(&pm.fastSync, 1)
        mode = downloader.FastSync
    }
    // Run the sync cycle, and disable fast sync if we've went past the pivot block
    if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
        return
    }
    if atomic.LoadUint32(&pm.fastSync) == 1 {
        log.Info("Fast sync complete, auto disabling")
        atomic.StoreUint32(&pm.fastSync, 0)
    }
    atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
    if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
        // We've completed a sync cycle, notify all peers of new state. This path is
        // essential in star-topology networks where a gateway node needs to notify
        // all its out-of-date peers of the availability of a new block. This failure
        // scenario will most often crop up in private and hackathon networks with
        // degenerate connectivity, but it should be healthy for the mainnet too to
        // more reliably update peers or the local TD state.
        go pm.BroadcastBlock(head, false)
    }
}

如果最优的网络节点的TD值大于本地最新区块的TD值,调用pm.downloader.Synchronise(peer.id, pHead, pTd, mode)进行同步。同步完成后再屌用go pm.BroadcastBlock(head, false),把自己最新的区块状态广播出去。

4,pm.txsyncLoop()方法

func (pm *ProtocolManager) txsyncLoop() {
    var (
        pending = make(map[discover.NodeID]*txsync)
        sending = false               // whether a send is active
        pack    = new(txsync)         // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )

    // send starts a sending a pack of transactions from the sync.
    send := func(s *txsync) {
        // Fill pack with transactions up to the target size.
        size := common.StorageSize(0)
        pack.p = s.p
        pack.txs = pack.txs[:0]
        for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
            pack.txs = append(pack.txs, s.txs[i])
            size += s.txs[i].Size()
        }
        // Remove the transactions that will be sent.
        s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
        if len(s.txs) == 0 {
            delete(pending, s.p.ID())
        }
        // Send the pack in the background.
        s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
        sending = true
        go func() { done <- pack.p.SendTransactions(pack.txs) }()
    }

    // pick chooses the next pending sync.
    pick := func() *txsync {
        if len(pending) == 0 {
            return nil
        }
        n := rand.Intn(len(pending)) + 1
        for _, s := range pending {
            if n--; n == 0 {
                return s
            }
        }
        return nil
    }

    for {
        select {
        case s := <-pm.txsyncCh:
            pending[s.p.ID()] = s
            if !sending {
                send(s)
            }
        case err := <-done:
            sending = false
            // Stop tracking peers that cause send failures.
            if err != nil {
                pack.p.Log().Debug("Transaction send failed", "err", err)
                delete(pending, pack.p.ID())
            }
            // Schedule the next send.
            if s := pick(); s != nil {
                send(s)
            }
        case <-pm.quitSync:
            return
        }
    }
}

当从网络节点同步过来最新的交易数据后,本地也会把新同步下来的交易数据广播给网络中的其他节点。

总结一下
这四个goroutine 基本上就在不停的做广播区块、广播交易,同步到区块、同步到交易,再广播区块、广播交易。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容