ZooKeeper服务端处理请求 单机

在Zookeeper服务端启动源码中,在启动ServerCnxnFactory工厂时调用ServerCnxnFactory#startup方法,启动服务端接收客户端连接工厂,默认为NIOServerCnxnFactory,所以处理请求也是以NIOServerCnxnFactory分析。

处理客户端连接

接收线程调用的是NIOServerCnxnFactory#run方法,处理客户端channel

while (!ss.socket().isClosed()) {
  try {
    selector.select(1000);
    Set<SelectionKey> selected;
    synchronized (this) {
      selected = selector.selectedKeys();
    }
    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
    Collections.shuffle(selectedList);
    for (SelectionKey k : selectedList) {
      if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
        InetAddress ia = sc.socket().getInetAddress();
        int cnxncount = getClientCnxnCount(ia);
        // 1
        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
          sc.close();
        } else {
          //2
          sc.configureBlocking(false);
          SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
          NIOServerCnxn cnxn = createConnection(sc, sk);
          sk.attach(cnxn);
          addCnxn(cnxn);
        }
      } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
        //3
        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
        c.doIO(k);
      } else {
        //log
      }
    }
    selected.clear();
  }
}
  1. 判断同一个ip连接数是否超限
  2. 新连接:创建NIOServerCnxn实例cnxn并调用sk.attach(cnxn)
  3. 读写:取出attachment并处理IO

看到这里我有两个问题

  • selector.select(1000),这个NIO epoll bug修复了吗?
  • c.doIO(k);IO处理单线程的,性能有问题,虽然后面会将请求交由其他线程处理,但也是单线程的

我这里看的版本是3.4.13,在3.5.3-beta版本至少改成多线程版本了

3.4.13 NIOServerCnxnFactory#start

public void start() {
  // ensure thread is started once and only once
  if (thread.getState() == Thread.State.NEW) {
    thread.start();
  }
}

3.5.3-beta NIOServerCnxnFactory#start

public void start() {
    stopped = false;
    // IO线程池
    if (workerPool == null) {
        workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
    }
    // selectorThreads作用类似于Netty中的childGroup
    for(SelectorThread thread : selectorThreads) {
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
    // 接收连接线程
    if (acceptThread.getState() == Thread.State.NEW) {
        acceptThread.start();
    }
    // 释放过期连接线程
    if (expirerThread.getState() == Thread.State.NEW) {
        expirerThread.start();
    }
}

在3.5.3-beta中做了如下改进

  • 由原来的单线程改为IO线程池处理(workerPool)
  • 客户端连接的读写事件也改为多线程(selectorThreads),类似于Netty中的childGroup
  • acceptThread作用类似Netty中的parentGroup

处理客户端IO

继续分析NIOServerCnxn#doIO方法

void doIO(SelectionKey k) throws InterruptedException {
  try {
    if (isSocketOpen() == false) {
      return;
    }
    if (k.isReadable()) {
      // 1. incomingBuffer赋值:ByteBuffer incomingBuffer = lenBuffer;
      // 和客户端一样先读4个字节的长度,解决拆包粘包
      int rc = sock.read(incomingBuffer);
      if (rc < 0) {
        throw new EndOfStreamException("");
      }
      if (incomingBuffer.remaining() == 0) {
        boolean isPayload;
        // 当前读的是长度
        if (incomingBuffer == lenBuffer) { // start of next request
          incomingBuffer.flip();
          // 2. 根据读取的长度,重新分配incomingBuffer
          isPayload = readLength(k);
          incomingBuffer.clear();
        } else {
          // continuation
          isPayload = true;
        }
        if (isPayload) { // not the case for 4letterword
          // 3 读取数据并处理
          readPayload();
        } else {
          return;
        }
      }
    }
    if (k.isWritable()) {
      // 1 从outgoingBuffers发送数据
      if (outgoingBuffers.size() > 0) {
        ByteBuffer directBuffer = factory.directBuffer;
        directBuffer.clear();

        // 2. 根据发送队列数据,将数据放到directBuffer一并发送
        for (ByteBuffer b : outgoingBuffers) {
          if (directBuffer.remaining() < b.remaining()) {
            // directBuffer放不下整个buffer,尝试放一部分
            b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
          }
          int p = b.position();
          directBuffer.put(b);
          b.position(p);
          if (directBuffer.remaining() == 0) {
            break;
          }
        }
        directBuffer.flip();

        // 3 发送数据
        int sent = sock.write(directBuffer);
        ByteBuffer bb;

        //4 根据send大小移除buffer队列
        while (outgoingBuffers.size() > 0) {
          bb = outgoingBuffers.peek();
          if (bb == ServerCnxnFactory.closeConn) {
            throw new CloseRequestException("close requested");
          }
          int left = bb.remaining() - sent;
          if (left > 0) {
            // 这个buff是上面slice之后还有剩余的,剩下的buffer都没发,跳出
            bb.position(bb.position() + sent);
            break;
          }
          packetSent();
          /* 移除已发送的buffer */
          sent -= bb.remaining();
          outgoingBuffers.remove();
        }
      }

      synchronized(this.factory) {
        if (outgoingBuffers.size() == 0) {
          if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
            throw new CloseRequestException("responded to info probe");
          }
          sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE));
        } else {
          sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
        }
      }
    }
  } catch (CancelledKeyException e) {
    close();
  } catch (CloseRequestException e) {
    // expecting close to log session closure
    close();
  } catch (EndOfStreamException e) {
    close();
  } catch (IOException e) {
    close();
  }
}

读操作

  1. 读数据,和客户端一样先读4个字节的长度,然后根据读得的长度分配buffer,解决拆包粘包问题
  2. 根据读取的长度,重新分配incomingBuffer
  3. 读数据并处理,后面分析

写操作

  1. 从outgoingBuffers发送数据,说明其他操作的结果数据将保存到此发送队列,节点的操作触发的watche事件也会保存到此队列
  2. 根据发送队列数据,将数据放到directBuffer一并发送
  3. 发送directBuffer中的数据
  4. 根据send大小移除buffer队列

读取客户端请求并处理

比如说对于客户端的getData请求,服务端先读取客户端通道的请求内容,然后根据请求的节点信息,从内存数据库中获取数据,然后再将数据返回给客户端。

读取客户端数据

NIOServerCnxn#readPayload方法读取客户端通道数据

private void readPayload() throws IOException, InterruptedException {
  // 1 正常逻辑是不应该进入if块的,因为上一步应该把len个字节的数据全读进去了
  if (incomingBuffer.remaining() != 0) { // have we read length bytes?
    int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
    if (rc < 0) {
      // 数据不完整
      throw new EndOfStreamException("");
    }
  }

  //2 incomingBuffer读满才说明数据是完整的
  if (incomingBuffer.remaining() == 0) { // have we read length bytes?
    packetReceived();
    incomingBuffer.flip();
    // 3 处理请求
    if (!initialized) {
      readConnectRequest();
    } else {
      readRequest();
    }
    // 4 重置两个buffer
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
  }
}
  1. 正常逻辑是不应该进入if块的,因为上一步应该把len个字节的数据全读进去了,如果进入了,还没读到数据,则说明数据包不完整
  2. incomingBuffer读满才说明数据是完整的
  3. 处理请求
  4. 重置两个buffer

zk字节数据流的格式是如下格式,前4个字节的长度length,后面length个长度的数据体,这种数据格式解决TCP数据包的拆包粘包问题。

|----|---------...----------|
 len           data

处理客户端请求

根据客户端连接是否初始化将请求分为连接请求和数据请求,连接请求需要为客户端生成session并将sessionId返回回去,而数据请求则需要处理数据。

创建连接

NIOServerCnxn#readConnectRequest方法读取连接请求,处理并响应,内部调用ZooKeeperServer#processConnectRequest方法处理,processConnectRequest方法流程

  1. 反序列化连接请求

    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    ConnectRequest connReq = new ConnectRequest();
    connReq.deserialize(bia, "connect");
    
  2. 根据连接是否只读以及服务器是否只读判断匹配,否则报错

    readOnly = bia.readBool("readOnly");
    if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
      String msg = "。。。"
      throw new CloseRequestException(msg);
    }
    
  3. 根据Zxid判断该连接连接是否可连

    if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
      String msg = "。。。"
      throw new CloseRequestException(msg);
    }
    
  4. 处理sessionTimeout

    byte passwd[] = connReq.getPasswd();
    。。。
    cnxn.setSessionTimeout(sessionTimeout);
    
  5. 根据sessionId有无进行重连或创建session操作

    • 创建session

      createSession(cnxn, passwd, sessionTimeout);
      

      有两步

      1. 由sessionTracker创建session,将session按过期时间分桶存放,返回创建的sessionId

        long sessionId = sessionTracker.createSession(timeout);
        
      2. 提交Request,由处理链处理,创建连接是个事务操作,需要保存到内存数据库中,具体后面处理请求部分分析

    • 重连

      cnxn.setSessionId(sessionId);
      reopenSession(cnxn, sessionId, passwd, sessionTimeout);
      
      1. 判断密码是否正确
      2. 根据sessionId构建session,并保存
      3. 返回响应。重连session已经保存到内存数据库,不需再处理。

数据请求

NIOServerCnxn#readRequest方法内部调用ZooKeeperServer#processPacket方法处理请求,流程:

  1. 反序列化请求头

    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    RequestHeader h = new RequestHeader();
    h.deserialize(bia, "header");
    
  2. 根据请求头类型分别处理

    if (h.getType() == OpCode.auth) {
     ...
    } else {
      if (h.getType() == OpCode.sasl) {
        ...
      }
      else {
        // 创建请求
        Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                                 h.getType(), incomingBuffer, cnxn.getAuthInfo());
        si.setOwner(ServerCnxn.me);
        // 提交请求
        submitRequest(si);
      }
    }
    

    权限认证类型比较简单,具体分析数据请求Request

  3. 创建请求,提交请求,将请求交由请求链处理

    // ZooKeeperServer#submitRequest(Request)
    firstProcessor.processRequest(si);
    

    在服务端启动源码已经分析了请求链

    // ZooKeeperServer#setupRequestProcessors
    protected void setupRequestProcessors() {
      RequestProcessor finalProcessor = new FinalRequestProcessor(this);
      RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
      ((SyncRequestProcessor)syncProcessor).start();
      firstProcessor = new PrepRequestProcessor(this, syncProcessor);
      ((PrepRequestProcessor)firstProcessor).start();
    }
    
    firstProcessor(PrepRequestProcessor) -> syncProcessor(SyncRequestProcessor) -> finalProcessor(FinalRequestProcessor)
    

    且firstProcessor、syncProcessor是以线程的方式启动的。

    对于PrepRequestProcessor#processRequest方法,仅仅将请求添加到submittedRequests队列

    public void processRequest(Request request) {
      submittedRequests.add(request);
    }
    
  4. 请求链PrepRequestProcessor firstProcessor处理请求

    // PrepRequestProcessor#run
    Request request = submittedRequests.take();
    pRequest(request);
    

    PrepRequestProcessor#pRequest方法内部,就是真正的处理流程

    protected void pRequest(Request request) throws RequestProcessorException {
      request.hdr = null;
      request.txn = null;
    
      try {
        // 1 根据request.type分别处理
        switch (request.type) {
          // 事务操作
          case OpCode.create:
            CreateRequest createRequest = new CreateRequest();
            pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
            break;
          case OpCode.delete:
            DeleteRequest deleteRequest = new DeleteRequest();               
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
            break;
          case OpCode.setData:
            SetDataRequest setDataRequest = new SetDataRequest();                
            pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
            break;
          ...
          case OpCode.sync:
          case OpCode.exists:
          case OpCode.getData:
          case OpCode.getACL:
          case OpCode.getChildren:
          case OpCode.getChildren2:
          case OpCode.ping:
          case OpCode.setWatches:
            // 非事务操作
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            break;
          default:
            break;
        }
      } catch (KeeperException e) {
        // 2 处理异常
        if (request.hdr != null) {
          request.hdr.setType(OpCode.error);
          request.txn = new ErrorTxn(e.code().intValue());
        }
        LOG.info("...");
        request.setException(e);
      } catch (Exception e) {
         // 处理异常。。。
      }
      request.zxid = zks.getZxid();
      // 3 调用nextProcessor处理
      nextProcessor.processRequest(request);
    }
    
    1. 根据请求类型分别处理,其中事务操作和非事务操作分别处理,非事务操作仅检查session
    2. 需要将异常信息设置到Request中
    3. 调用下一个Processor处理
  5. 处理事务请求
    事务请求会调用PrepRequestProcessor#pRequest2Txn方法,以setData类型为例

    // 设置事务头
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type);
    
    case OpCode.setData:
     // 检查session
      zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
      SetDataRequest setDataRequest = (SetDataRequest)record;
      if(deserialize)
        // 反序列化SetDataRequest
        ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
      path = setDataRequest.getPath();
     // 检测路径是否有效
      validatePath(path, request.sessionId);
     // 针对当前路径节点的未完成变更
      nodeRecord = getRecordForPath(path);
      checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
               request.authInfo);
     version = setDataRequest.getVersion();
      int currentVersion = nodeRecord.stat.getVersion();
     // 比较版本是否正确
      if (version != -1 && version != currentVersion) {
        throw new KeeperException.BadVersionException(path);
      }
      // version+1
     version = currentVersion + 1;
      request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
      nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
      nodeRecord.stat.setVersion(version);
     // 添加到队列中,为保存操作的顺序性
      addChangeRecord(nodeRecord);
      break;
    

    PrepRequestProcessor中主要还是补充事务请求头和一些请求检测,包括session、节点路径、版本等。注意最后的addChangeRecord方法的调用

    void addChangeRecord(ChangeRecord c) {
      synchronized (zks.outstandingChanges) {
        zks.outstandingChanges.add(c);
        zks.outstandingChangesForPath.put(c.path, c);
      }
    }
    

    将节点变更记录添加到outstandingChanges队列中,主要是为了保证响应的顺序性,在后面FinalRequestProcessor会用到

  6. 请求链SyncRequestProcessor syncProcessor处理请求
    PrepRequestProcessor#pRequest方法最后调用nextProcessor.processRequest(request);继续处理请求,根据链式结构,进入SyncRequestProcessor#processRequest方法,该方法也是将request添加到队列中,等待线程消费处理。这个处理器主要工作是同步,将内存数据及日志写入snapshot文件和flush log文件,不过时间是半随机的,为了确保不是集群中的所有服务器都同时保存快照,在run方法中

    public void run() {
      try {
        int logCount = 0;
    
        // 1 我们这样做是为了确保不是集成中的所有服务器都同时保存快照
        setRandRoll(r.nextInt(snapCount/2));
        while (true) {
          Request si = null;
          if (toFlush.isEmpty()) {
            si = queuedRequests.take();
          } else {
            si = queuedRequests.poll();
            if (si == null) {
              // 请求队列没了,则flush队列
              flush(toFlush);
              continue;
            }
          }
          if (si == requestOfDeath) {
            break;
          }
          if (si != null) {
            // 2 将请求写入日志文件
            if (zks.getZKDatabase().append(si)) {
              logCount++;
              // 3 保存的日志数量过多
              if (logCount > (snapCount / 2 + randRoll)) {
                setRandRoll(r.nextInt(snapCount/2));
                // 滚动日志文件
                zks.getZKDatabase().rollLog();
                // take a snapshot
                if (snapInProcess != null && snapInProcess.isAlive()) {
                  // 当前保存快照线程还在运行,说明此时系统有些繁忙
                  LOG.warn("Too busy to snap, skipping");
                } else {
                  snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                    public void run() {
                      try {
                        zks.takeSnapshot();
                      } catch(Exception e) {
                        LOG.warn("Unexpected exception", e);
                      }
                    }
                  };
                  // 启动新线程保存快照
                  snapInProcess.start();
                }
                logCount = 0;
              }
            } else if (toFlush.isEmpty()) {
              // 到这里其实请求只是一个读请求,而且前面没有堆积请求,直接传递给下一个处理器
              if (nextProcessor != null) {
                nextProcessor.processRequest(si);
                if (nextProcessor instanceof Flushable) {
                  ((Flushable)nextProcessor).flush();
                }
              }
              continue;
            }
            // 4 保存到队列
            toFlush.add(si);
            // 批量
            if (toFlush.size() > 1000) {
              // 5 这个操作会将日志强制刷写到磁盘,然后将toFlush队列传递给下个处理器
              flush(toFlush);
            }
          }
        }
      } catch (Throwable t) {
        handleException(this.getName(), t);
        running = false;
      }
      LOG.info("SyncRequestProcessor exited!");
    }
    
    1. 写快照的时机半随机,为了防止集群中所有的机器一起写快照(消耗性能),造成整个集群响应变慢
    2. 将日志写入磁盘,实际上只调用了flush方法,将数据写入流中
    3. 保存的日志数量过多,则滚动日志(重新创建一个日志文件,写入)并开启一条线程保存快照
    4. 将request加入到toFlush队列
    5. toFlush队列数量达到限制,日志强制刷写到磁盘,然后将toFlush队列传递给下个处理器。因为写磁盘的性能消耗,所以会在请求队列处理完或者堆积的toFlush队列数量过多才批量刷写磁盘,SyncRequestProcessor#flush方法不能频繁调用。
  7. 请求链FinalRequestProcessor finalProcessor处理请求
    ZooKeeperServer#setupRequestProcessors方法中可以看到,finalProcessor并不是以线程启动,其processRequest方法处理事务并将响应发回给客户端。

    synchronized (zks.outstandingChanges) {
      //1 进入while块实际上说明已经出错了
      while (!zks.outstandingChanges.isEmpty()
             && zks.outstandingChanges.get(0).zxid <= request.zxid) {
        //remove
        ...
      }
      // 2 说明是事务操作,处理事务
      if (request.hdr != null) {
        TxnHeader hdr = request.hdr;
        Record txn = request.txn;
        // 处理结果以 ProcessTxnResult rc返回
        rc = zks.processTxn(hdr, txn);
      }
      // 3 需要投票的包,实际上就是事务操作,添加Proposal
      if (Request.isQuorum(request.type)) {
        zks.getZKDatabase().addCommittedProposal(request);
      }
    }
    ...
    // 4 创建响应
    switch (request.type) {
     ...
      case OpCode.setData: {
        lastOp = "SETD";
        rsp = new SetDataResponse(rc.stat);
        err = Code.get(rc.err);
        break;
      }
    }
    ...
    // 5 发送响应
    long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
    ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
    cnxn.sendResponse(hdr, rsp, "response");
    
    1. 进入while块实际上说明已经出错了,因为outstandingChanges是顺序添加的,前面的zxid肯定要比后面的小,如果第0个比当前的request.zxid小,则这个是出错了
    2. 事务操作,处理事务,zks.processTxn处理事务并将结果返回
    3. 需要投票的请求,实际上就是事务操作,添加Proposal
    4. 创建响应,事务请求根据事务操作的结果创建,非事务请求直接获取节点数据
    5. 发送响应
  8. 数据节点操作
    在第7步中,对于数据节点的操作,这里详细分析

    rc = zks.processTxn(hdr, txn);
    
    // ZooKeeperServer#processTxn
    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
      ...
      rc = getZKDatabase().processTxn(hdr, txn);
      ...
      return rc;
    }
    

    通过调用DataTree#processTxn方法保存数据到DataTree中

    设置数据:

    case OpCode.setData:
      SetDataTxn setDataTxn = (SetDataTxn) txn;
      rc.path = setDataTxn.getPath();
      rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(),
                        setDataTxn.getVersion(), header.getZxid(), 
                        header.getTime());
    break;
    

    调用DataTree#setData方法设置数据

    public Stat setData(String path, byte data[], int version, long zxid, long time)
      throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte lastdata[] = null;
         // 设置数据
        synchronized (n) {
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
        }
         // 触发watch
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
    
    1. 设置数据

    2. 触发watch,就是取出对应path的watcher,然后调用

      watchers = watchTable.remove(path);
      for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
          continue;
        }
        w.process(e);
      }
      

      需要注意的依然是:1. remove方法,也就是一次性的;2. watcher是什么?watcher实际上是cnxn也就客户端连接,在设置watcher的时候可以看到,可以看看NIOServerCnxn#process方法

      synchronized public void process(WatchedEvent event) {
          WatcherEvent e = event.getWrapper();
          sendResponse(h, e, "notification");
      }
      

      服务端发送的是WatchedEvent,是没有数据的,也就是说客户端需要根据event中的信息具体处理。

    获取数据:

    case OpCode.getData: {
      ...
     //  watcher传入的是cnxn
      byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                                             getDataRequest.getWatch() ? cnxn : null);
      rsp = new GetDataResponse(b, stat);
      break;
    }
    

    如果getWatch方法返回true,则将cnxn以watcher传参,DataTree#getData方法中

    public byte[] getData(String path, Stat stat, Watcher watcher)
      throws KeeperException.NoNodeException {
      // 1 获取节点数据
      DataNode n = nodes.get(path);
      if (n == null) {
        throw new KeeperException.NoNodeException();
      }
      synchronized (n) {
        n.copyStat(stat);
        // 添加watcher
        if (watcher != null) {
          // 添加watcher
          dataWatches.addWatch(path, watcher);
        }
        return n.data;
      }
    }
    
    1. 获取节点数据
    2. 添加watcher,这里watcher是cnxn

至此,单机版服务端处理请求流程结束。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容