Redis分布式锁的实现原理 - Redisson和RedisLockRegistry

主要接触到的Redis分布式锁有两种框架RedisLockRegistryRedisson,今天来看下两种框架的实现原理;

RedisLockRegistry

Spring-inintegration-redis中提供的一种实现方式,使用方式如下

@Bean("redisLockRegistry")
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
    return new RedisLockRegistry(redisConnectionFactory, "spring-redis-lock");
}

在配置文件中直接注册成一个Bean,下面是一些构造时的源码;

public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
    // DEFAULT_EXPIRE_AFTER默认是6秒
    this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
}

public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
    Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
    Assert.notNull(registryKey, "'registryKey' cannot be null");
    this.redisTemplate = new StringRedisTemplate(connectionFactory);
    this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
    this.registryKey = registryKey;
    this.expireAfter = expireAfter;
}

构造时生成了一个RedisTemplate用于执行Redis操作,一个ObtainLockScript用来保存获取锁的脚本;

private final class RedisLock implements Lock {
        
    private final String lockKey;

    private final ReentrantLock localLock = new ReentrantLock();

    private volatile boolean unlinkAvailable = true;

    private volatile long lockedAt;
    // 构造的时候只储存一个key
    private RedisLock(String path) {
        this.lockKey = constructLockKey(path);
    }
        
    private String constructLockKey(String path) {
        return RedisLockRegistry.this.registryKey + ":" + path;
    }
    ...
}

内部封装了一个Redis锁的实现,主要包括一个锁的标识lockKey和一个重入锁ReentrantLock

Lock lock = redisLockRegistry.obtain(key);
try {
    if (lock.tryLock(5, TimeUnit.SECONDS)) {
        try {
            ...
        } finally {
            lock.unlock();
        }
    } else {
        throw new GetLockTimeoutException();
    }
} catch (InterruptedException e) {
    throw new GetLockTimeoutException();
}

上面是代码中的基本用法,可以选择封装成一个切面使用起来方便一些,首先来看下获取锁的操作obtain;

private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
public Lock obtain(Object lockKey) {
    Assert.isInstanceOf(String.class, lockKey);
    String path = (String) lockKey;
    return this.locks.computeIfAbsent(path, RedisLock::new);
}

从一个ConcurrentHashMap中根据锁的标识获取一个锁,如果没有则构造一个新的;

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    long now = System.currentTimeMillis();
    // 先是本地的重入锁先尝试加锁
    if (!this.localLock.tryLock(time, unit)) {
        // 如果本地线程加锁失败,直接返回
        return false;
    }
    try {
        long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
        boolean acquired;
        // 本地加锁成功后执行redis加锁
        while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { 
            // 加锁失败且没过超时时间,睡一会再试
            Thread.sleep(100); 
        }
        if (!acquired) {
            // 如果没有获取到锁,把本地锁释放直接返回
            this.localLock.unlock();
        }
        return acquired;
    }
    catch (Exception e) {
        this.localLock.unlock();
        rethrowAsLockException(e);
    }
    return false;
}

看这段逻辑需要一点ReentrantLock的知识,可以先了解下Java8 源码阅读 - AbstractQueuedSynchronizer

首先是本地的重入锁先进行加锁,加锁步骤如下:

  • 如果锁资源没有被其他线程占用,直接加锁,亦或者是本地线程已经占用锁,再次重入也被视为加锁成功;
  • 如果锁资源被其他线程占用,排队等待指定时间,指定时间内获取不到资源则加锁失败;

成功在本地加锁后会执行Redis加锁操作

private boolean obtainLock() {
    Boolean success =
            RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
                    Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
                    String.valueOf(RedisLockRegistry.this.expireAfter));

    boolean result = Boolean.TRUE.equals(success);

    if (result) {
        this.lockedAt = System.currentTimeMillis();
    }
    return result;
}

private static final String OBTAIN_LOCK_SCRIPT =
        "local lockClientId = redis.call('GET', KEYS[1])\n" +
                "if lockClientId == ARGV[1] then\n" +
                "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                "  return true\n" +
                "elseif not lockClientId then\n" +
                "  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
                "  return true\n" +
                "end\n" +
                "return false";

利用Redis执行Lua脚本是原子性的特性来实现的,首先redis.call('GET', KEYS[1])来判断是否已经获取到锁,如果发现锁已经被占用,则判断是不是来自同一个客户端的请求,如果是就更新解锁时间(默认6s),如果不是来自同一个客户端的(即多个节点对同一个key操作),只不会允许非第一个客户端的操作;上锁的操作是redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]),key是外面指定的key,而value则是客户端的uuid;

解锁的逻辑也是比较简单,直接调用del或者unlink删除key即可;

private void removeLockKey() {
    if (this.unlinkAvailable) {
        try {
            RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
        }
        catch (Exception ex) {
            LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
                    "falling back to the regular DELETE command", ex);
            this.unlinkAvailable = false;
            RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
        }
    }
    else {
        RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
    }
}

在单节点的Redis架构下,RedisLockRegistry是可以满足一些不复杂的场景的,但是需要考虑锁的续租、Redis集群架构部署等问题的话,这个框架可能就会稍显拉垮,所以大多数情况都是使用Redisson

Redisson

使用方式大致如下

Config config = new Config();
config.useSentinelServers()
        .addSentinelAddress(
                "redis://127.0.0.1:6379",
                "redis://127.0.0.2:6379",
                "redis://127.0.0.3:6379");
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("key");
// 加锁的两种方式
lock.lock();
lock.lock(5, TimeUnit.SECONDS);
lock.unlock();

下面简单分析下Redisson的加锁逻辑

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    ...
}

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 在获取锁后,如果还没有通过调用unlock释放锁,
    // 则保持锁的最大时间。如果leaseTime为-1,则保持锁定直到显式解锁。
    if (leaseTime != -1) {
        // 指定了锁的释放时间
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 没有指定锁的释放时间
    // 尝试加锁
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            // 发现异常
            return;
        }

        // 加锁成功
        if (ttlRemaining == null) {
            // 定时续租
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 里面处理了包括选取节点、重试等操作
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

先看下加锁的逻辑,这里就忽略了集群节点选取等其他代码,直接看这段Lua脚本的命令,执行逻辑如下

  • 检查key是否存在
    • 如果key不存在,直接将field即客户端id+key+线程id组成的uuid对应的value置为1,并更新过期时间,最后返回nil;
    • 如果key存在,则判断field(即上面的uuid)是否存在,如果不存在,则说明该锁的已被其他占用,直接返回key剩余的ttl;如果field存在,则说明自己是锁的持有者,将filed对应的value自增1(重入);

如果没有指定锁的释放时间,将会触发锁续租的操作;

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 记录续租的次数
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        // 执行续租操作
        renewExpiration();
    }
}
// 续租
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        // 已经被停止续租
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                // 停止续租就是将该key从renewal集合中移除
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    // 续租操作有异常,停止续租
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // 重新续租
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getName()),
            internalLockLeaseTime, getLockName(threadId));
}

来看下续租的操作,每个获取锁的线程都会执行续租任务,默认是30 / 3 = 10秒钟重新执行一次续租操作直到下面三个条件其中一个触发

  • redis执行续租命令异常
  • 手动解锁操作
  • 续租操作失败(通常是redis中key不存在了)

来看下续租的Lua命令

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1; 
end;
    return 0;

如果对应key和field都存在,直接通过pexpire命令更新过期时间;

下面是看看获取锁失败的操作

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        if (ttl == null) {
            // 获取锁成功
            return;
        }
        // 获取锁失败
        // 利用redis的pub/sub机制来订阅到锁被释放的消息
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        // 根据是否支持中断来使用netty不同的同步操作
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                // 重复尝试加锁
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                if (ttl == null) {
                    // 直到获取锁成功
                    break;
                }

                if (ttl >= 0) {
                    try {
                        // 利用Semaphore阻塞直到ttl过去
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

如果tryAcquire返回的值不为null,即表示锁资源被其他线程占用,本地会启动一个死循环重复执行加锁操作,每一次上锁失败拿到ttl后,会利用Semaphore阻塞当前线程直到ttl过去,当然也可能被之前订阅的解锁消息主动唤醒;

protected void onMessage(RedissonLockEntry value, Long message) {
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        // 当收到解锁的通知时会主动把Semaphore阻塞的线程释放
        value.getLatch().release();
    }
}

这是当Redis收到订阅的解锁事件执行的方法,这里会主动把RedissonLockEntry持有的所有被阻塞的线程都释放;

// 解锁的实际逻辑
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        // 取消锁续租操作
        cancelExpirationRenewal(threadId);
        ...
    });

    return result;
}

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

解锁的逻辑如下

  • 判断这个锁是否被当前线程持有
    • 如果当前线程不是锁的持有者,直接返回nil;
    • 如果当前线程是锁的持有者,将redis保存的数值减1,如果减1后的结果是大于0的,说明该锁是被重复加锁的,这时候需要对锁进行续租,时间为internalLockLeaseTime,并返回为0;
      • 如果减1后的结果为0,将key删除后并发布解锁的消息;
RedisLockRegistry和Redisson对比

不同点:

  • RedisLockRegistry有一个本地加锁的逻辑,只有当本地加锁成功才能继续执行redis加锁逻辑,重入逻辑也是做在本地,所以理论上RedisLockRegistryRedisson会快那么一点点;
  • Redisson有锁续租功能,解决了加锁成功后逻辑执行未完成时锁到期被释放,导致其他资源获取锁的混乱;
RedLock

因为Redis集群主从同步时会有延迟,有可能因为master节点挂掉,master节点的锁还未同步到slave时,slave被选举成master而可能其他线程能在新master上重复获得锁,而导致锁资源加锁混乱的问题;

所以就有了一个RedLock来解决这种问题,大致的思想是在集群上不同节点都尝试加锁,步骤大致如下

  1. 获取当前的时间戳(毫秒)。
  2. 依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。在步骤 2,当向 Redis 设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个 Redis 实例。
  3. 客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
    如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
  4. 如果因为某些原因,获取锁失败(没有在至少 N/2+1 个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功)。

通过在集群大多数实例上获取来保证锁的可用性,比较一个集群内节点都宕机的可能性还是很低的,Redisson内置了该算法的实现,实现类是RedissonRedLock

RedissonRedLock
RLock lock = redissonClient.getLock("key");
RLock lock2 = redissonClient.getLock("key");
RedissonRedLock redLock = new RedissonRedLock(lock, lock2);
redLock.lock();
redLock.unlock();

使用方式如上,下面看下源码具体的实现逻辑

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 每个锁至少等待1500ms,总时间就是1500*数量
    long baseWaitTime = locks.size() * 1500;
    long waitTime = -1;
    if (leaseTime == -1) {
        waitTime = baseWaitTime;
    } else {
        // 如果指定了锁的释放时间
        leaseTime = unit.toMillis(leaseTime);
        waitTime = leaseTime;
        if (waitTime <= 2000) {
            waitTime = 2000;
        } else if (waitTime <= baseWaitTime) {
            // 如果锁的释放时间小于框架内置的等待时间
            // 重制等待时间
            waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
        } else {
            // 如果锁的释放时间大于框架内置的等待时间
            // 重制等待时间
            waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
        }
    }
    
    while (true) {
        // 重复上锁直到成功或者异常
        if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
            return;
        }
    }
}

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 重制锁的释放时间 至少要求是集群获取锁的等待时间以上
    long newLeaseTime = ...;
    // 获取当前集群的时间戳
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    // 每个锁保留的时间是waitTime
    long lockWaitTime = calcLockWaitTime(remainTime);
    // 加锁失败的限制数
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    // 遍历锁
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            // 进行加锁
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // redis访问超时 解锁当前的节点
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            acquiredLocks.add(lock);
        } else {
            // 加锁失败
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                // 如果获取到锁的节点数大于预置的节点数,则判断这次加锁已经成功
                break;
            }
            
            if (failedLocksLimit == 0) {
                // 加锁失败次数已经到达了限制
                // 将已加锁成功的全部解锁
                unlockInner(acquiredLocks);
                if (waitTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // 重制迭代器
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                // 记录失败的次数
                failedLocksLimit--;
            }
        }
        
        if (remainTime != -1) {
            // 计算还剩余的时间
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            // 如果已经没有剩余时间了,将所有获取到锁的节点进行解锁
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        // 更新每个节点锁的超时时间
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
    return true;
}

加锁的逻辑基本上按照RedLock算法来实现的,流程如下:

  1. 每个节点加锁至少等待1500ms,等待总时间就是1500*节点数量;
  2. 按照RedLock算法,计算每个节点获取锁的等待时间;
  3. 从第一个节点开始进行加锁操作,加锁的执行逻辑和上面单节点的一样,如果加锁成功继续下一个节点,如果加锁失败,首先判断加锁成功的节点数是否已经满足最低加锁个数限制,比如5个节点中必须大于等于3个节点;
  • 如果满足则停止加锁,本次加锁操作执行成功;
  • 如果不满足,判断加锁失败次数是否达到上限,如果没有达到上限,记录失败次数并对下一个节点加锁,如果已经达到加锁失败次数的上限,判断本次集群加锁失败,解锁所有成功获取到锁的节点;
  1. 每个节点加锁无论成功或失败,执行完成后都需要判断剩余时间,如果剩余时间已经小于0且未完成所有节点的加锁,判断为加锁失败并进行解锁;
  2. 如果加锁成功,更新所有加锁成功节点的过期时间;
后续

在Redisson3.15.0版本中RedissonRedLock已经不被推荐使用了,理由是现在RLock对象加锁时会自动传播到所有的Slaves,具体文档可以在这里查询;https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock

虽然不再推荐用了,但是长点见识也是可以的;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容