



  7 struct skynet_message {
  8     uint32_t source;   //表示消息的来源
  9     int session;  //对应哪个协程
 10     void * data;
 11     size_t sz;
 12 };  //一条消息的声明


 21 struct message_queue {
 22     struct spinlock lock;
 23     uint32_t handle;  //服务句柄
 24     int cap;
 25     int head;
 26     int tail;
 27     int release;
 28     int in_global;   //表明该消息队列是否在全局队列中
 29     int overload;
 30     int overload_threshold;
 31     struct skynet_message *queue;  //消息数组,默认大小64个
 32     struct message_queue *next;
 33 };

 35 struct global_queue {
 36     struct message_queue *head;  //指向头
 37     struct message_queue *tail;   //指向尾
 38     struct spinlock lock;  //自旋锁
 39 };



211 void
212 skynet_mq_init() {
213     struct global_queue *q = skynet_malloc(sizeof(*q));
214     memset(q,0,sizeof(*q));
215     SPIN_INIT(q);
216     Q=q;
217 }


 77 struct message_queue *
 78 skynet_mq_create(uint32_t handle) {
 79     struct message_queue *q = skynet_malloc(sizeof(*q));
 80     q->handle = handle;
 81     q->cap = DEFAULT_QUEUE_SIZE;
 82     q->head = 0;
 83     q->tail = 0;
 84     SPIN_INIT(q)
 85     // When the queue is create (always between service create and service init) ,
 86     // set in_global flag to avoid push it to global queue .
 87     // If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
 88     q->in_global = MQ_IN_GLOBAL;
 89     q->release = 0;
 90     q->overload = 0;
 91     q->overload_threshold = MQ_OVERLOAD;
 92     q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);
 93     q->next = NULL;
 95     return q;
 96 }



137 int
138 skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
139     int ret = 1;
140     SPIN_LOCK(q)
142     if (q->head != q->tail) {
143         *message = q->queue[q->head++];
144         ret = 0;
145         int head = q->head;
146         int tail = q->tail;
147         int cap = q->cap;
149         if (head >= cap) {
150             q->head = head = 0;
151         }
152         int length = tail - head;
153         if (length < 0) {
154             length += cap;
155         }
156         while (length > q->overload_threshold) {
157             q->overload = length;
158             q->overload_threshold *= 2;
159         }
160     } else {
161         // reset overload_threshold when queue is empty
162         q->overload_threshold = MQ_OVERLOAD;
163     }
165     if (ret) {
166         q->in_global = 0;
167     }
169     SPIN_UNLOCK(q)
171     return ret;
172 }

q->head == q->tail表示消息队列为空,就把他从全局消息中删除skynet_globalmq_pop,即执行q->in_global = 0

316     for (i=0;i<n;i++) {
317         if (skynet_mq_pop(q,&msg)) {
318             skynet_context_release(ctx);
319             return skynet_globalmq_pop();

当服务消息队列不为空时,取head索引处消息,并自增head,当head >= cap表示需要回到索引0处等pop下一条消息;剩下的逻辑是计算有多少条消息;


189 void
190 skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
191     assert(message);
192     SPIN_LOCK(q)
194     q->queue[q->tail] = *message;
195     if (++ q->tail >= q->cap) {
196         q->tail = 0;
197     }
199     if (q->head == q->tail) {
200         expand_queue(q);
201     }
203     if (q->in_global == 0) {
204         q->in_global = MQ_IN_GLOBAL;
205         skynet_globalmq_push(q);
206     }
208     SPIN_UNLOCK(q)
209 }

不管是pop消息还是push消息,都是复制,对指针进行了浅拷贝,由上篇分析可知,消息在各服务间基本是zero-copy;这里虽然用数组存储消息,但并未使用常规方法包括取模判断,留一个空表示队列的满;当++ q->tail >= q->cap时,表示需要回绕到数组索引0时;当q->head == q->tail表示队列满,需要扩容;然后如果不在全局队列中(或者第一次进全局队列,或者因没消息被pop出全局队列)的话需要push到全局队列等工作线程处理;

174 static void
175 expand_queue(struct message_queue *q) {
176     struct skynet_message *new_queue = skynet_malloc(sizeof(struct skynet_message) * q->cap * 2)    ;
177     int i;
178     for (i=0;i<q->cap;i++) {
179         new_queue[i] = q->queue[(q->head + i) % q->cap];
180     }
181     q->head = 0;
182     q->tail = q->cap;
183     q->cap *= 2;
185     skynet_free(q->queue);
186     q->queue = new_queue;
187 }

扩容就是以两倍的大小,然后把原来的消息重新push到新数组中,获取原来的消息以q->queue[(q->head + i) % q->cap],因为可能出现q->head>q->tail的情况;




128 static void *
129 thread_timer(void *p) {
130     //more code...
132     for (;;) {
133         skynet_updatetime();
136         usleep(2000); //每2毫秒
137         //more code...
141     }
142     //more code...
149     return NULL;
150 }

266 void
267 skynet_updatetime(void) {
268     uint64_t cp = gettime();
269     if(cp < TI->current_point) {
271         TI->current_point = cp;
272     } else if (cp != TI->current_point) {
273         uint32_t diff = (uint32_t)(cp - TI->current_point);
274         TI->current_point = cp;
275         TI->current += diff;
276         int i;
277         for (i=0;i<diff;i++) {
278             timer_update(TI);
279         }
280     }
281 }



242     struct timeval tv;
243     gettimeofday(&tv, NULL);
244     *sec = tv.tv_sec;
245     *cs = tv.tv_usec / 10000;

258     struct timeval tv;
259     gettimeofday(&tv, NULL);
260     t = (uint64_t)tv.tv_sec * 100;
261     t += tv.tv_usec / 10000;
 31 struct timer_event {
 32     uint32_t handle; //服务句柄
 33     int session;  //对应消息的会话,比如协程
 34 };
 36 struct timer_node {
 37     struct timer_node *next;
 38     uint32_t expire; //超时时间
 39 };
 41 struct link_list {
 42     struct timer_node head;
 43     struct timer_node *tail;
 44 };
 46 struct timer {
 47     struct link_list near[TIME_NEAR];  //即将处理的节点
 48     struct link_list t[4][TIME_LEVEL];  //由时间远近分层
 49     struct spinlock lock;
 50     uint32_t time;  //累计多少个十毫秒
 51     uint32_t starttime;
 52     uint64_t current;
 53     uint64_t current_point;
 54 };


401     int session = skynet_context_newsession(context);

211 int
212 skynet_timeout(uint32_t handle, int time, int session) {
213     if (time <= 0) {
214         struct skynet_message message;
215         message.source = 0;
216         message.session = session;
217         message.data = NULL;
218         message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;   //响应消息类型
220         if (skynet_context_push(handle, &message)) {
221             return -1; //压入服务消息队列
222         }
223     } else {
224         struct timer_event event;
225         event.handle = handle;
226         event.session = session;
227         timer_add(TI, &event, sizeof(event), time);
228     }
230     return session;
231 }


 95 static void
 96 timer_add(struct timer *T,void *arg,size_t sz,int time) {
 97     struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
 98     memcpy(node+1,arg,sz);
100     SPIN_LOCK(T);
102         node->expire=time+T->time;  //这里可能发生溢出
103         add_node(T,node);
105     SPIN_UNLOCK(T);
106 }

在挨着struct timer_node地址分配struct timer_event,并保存arg数据;然后加锁,设置超时时间,添加节点,解锁;

 74 static void
 75 add_node(struct timer *T,struct timer_node *node) {
 76     uint32_t time=node->expire;
 77     uint32_t current_time=T->time;
 79     if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) {
 80         link(&T->near[time&TIME_NEAR_MASK],node);
 81     } else {
 82         int i;
 83         uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT;
 84         for (i=0;i<3;i++) {
 85             if ((time|(mask-1))==(current_time|(mask-1))) {
 86                 break;
 87             }
 88             mask <<= TIME_LEVEL_SHIFT;
 89         }
 91         link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);    
 92     }
 93 }

 24 #define TIME_NEAR_SHIFT 8
 25 #define TIME_NEAR (1 << TIME_NEAR_SHIFT)
 26 #define TIME_LEVEL_SHIFT 6
 27 #define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)
 28 #define TIME_NEAR_MASK (TIME_NEAR-1)  //0xff
 29 #define TIME_LEVEL_MASK (TIME_LEVEL-1) //0x1f

date -r 1500000000时间为2017年 7月14日 星期五 10时40分00秒 CST;
2017年 7月14日 星期五 10时44分15秒 CST,最迟是四分钟十几秒;代码82〜91行,是那些超时时间比这个久的,就会按分层,这样对应:0层->高18位(0x3ff),1层->高12位(0xfffff),2层->高6位(0x3ffffff),3层->是剩下的,最后找到适合的i位置;当然这里的time不是真正的时间,而是自程序启动后,总共走了多少个tick;



172 static void
173 timer_update(struct timer *T) {
174     SPIN_LOCK(T);
176     // try to dispatch timeout 0 (rare condition)
177     timer_execute(T);
179     // shift time first, and then dispatch timer message
180     timer_shift(T);
182     timer_execute(T);
184     SPIN_UNLOCK(T);
185 }
159 static inline void
160 timer_execute(struct timer *T) {
161     int idx = T->time & TIME_NEAR_MASK;
163     while (T->near[idx].head.next) {
164         struct timer_node *current = link_clear(&T->near[idx]);
165         SPIN_UNLOCK(T);
166         // dispatch_list don't need lock T
167         dispatch_list(current);
168         SPIN_LOCK(T);
169     }
170 }

141 static inline void
142 dispatch_list(struct timer_node *current) {
143     do {
144         struct timer_event * event = (struct timer_event *)(current+1);
145         struct skynet_message message;
146         message.source = 0;
147         message.session = event->session;
148         message.data = NULL;
149         message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
151         skynet_context_push(event->handle, &message);
153         struct timer_node * temp = current;
154         current=current->next;
155         skynet_free(temp);
156     } while (current);
157 }


108 static void
109 move_list(struct timer *T, int level, int idx) {
110     struct timer_node *current = link_clear(&T->t[level][idx]);
111     while (current) {
112         struct timer_node *temp=current->next;
113         add_node(T,current);
114         current=temp;
115     }
116 }
118 static void
119 timer_shift(struct timer *T) {
120     int mask = TIME_NEAR;
121     uint32_t ct = ++T->time;
122     if (ct == 0) {
123         move_list(T, 3, 0);
124     } else {
125         uint32_t time = ct >> TIME_NEAR_SHIFT;
126         int i=0;
128         while ((ct & (mask-1))==0) {
129             int idx=time & TIME_LEVEL_MASK;
130             if (idx!=0) {
131                 move_list(T, i, idx);
132                 break;
133             }
134             mask <<= TIME_LEVEL_SHIFT;
135             time >>= TIME_LEVEL_SHIFT;
136             ++i;
137         }
138     }
139 }

由于只处理near数组中的节点,当过了一定的tick数后,在T->t[level]可能有快超时的,故这里需要做个跨level的过程并处理Q问题;这里针对T->time为0要特殊处理;当(T->time & (mask-1))==0才需要处理跨level的情况;每TIME_LEVEL_MASK比较,找到后就移动,为什么这样做?因为在添加节点到相应的level上时,正因为(time|(mask-1))==(current_time|(mask-1))才找到适合的level,而这里是个相反的过程;





100 struct socket_server {
 101     int recvctrl_fd;
 102     int sendctrl_fd;
 104     poll_fd event_fd;
 105     int alloc_id;
 106     int event_n;
 107     int event_index;
 109     struct event ev[MAX_EVENT];
 110     struct socket slot[MAX_SOCKET];
 111     char buffer[MAX_INFO];
 113     fd_set rfds;
 114 };  //more code


 328 struct socket_server *
 329 socket_server_create() {
 330     int i;
 331     int fd[2];
 332     poll_fd efd = sp_create();
         // check efd
 337     if (pipe(fd)) {
             //check fd
 341     }
 342     if (sp_add(efd, fd[0], NULL)) {  //监听管道可读事件,阻塞
             //check add event
 349     }
 351     struct socket_server *ss = MALLOC(sizeof(*ss));
 352     ss->event_fd = efd;
 353     ss->recvctrl_fd = fd[0];
 354     ss->sendctrl_fd = fd[1];
 356     ss->checkctrl = 1;
 357     for (i=0;i<MAX_SOCKET;i++) { //初始化请求连接socket信息
 358         struct socket *s = &ss->slot[i];
 359         s->type = SOCKET_TYPE_INVALID;
 360         clear_wb_list(&s->high);
 361         clear_wb_list(&s->low);
 362     }
 363     ss->alloc_id = 0;
 364     ss->event_n = 0;
 365     ss->event_index = 0;
 367     FD_ZERO(&ss->rfds);
 370     return ss;
 371 }


 55 static void
 56 wakeup(struct monitor *m, int busy) {
 57     if (m->sleep >= m->count - busy) {
 58         // signal sleep worker, "spurious wakeup" is harmless
 59         pthread_cond_signal(&m->cond);
 60     }
 61 }
 63 static void *
 64 thread_socket(void *p) {
 65     struct monitor * m = p;
 67     for (;;) {
 68         int r = skynet_socket_poll();
 69         if (r==0)
 70             break;
 71         if (r<0) {
 73             continue;
 74         }
 75         wakeup(m,0);
 76     }
 77     return NULL;
 78 }


 73 int
 74 skynet_socket_poll() {
 75     struct socket_server *ss = SOCKET_SERVER;
 77     struct socket_message result;
 78     int more = 1;
 79     int type = socket_server_poll(ss, &result, &more);
 80     switch (type) {
 81     case SOCKET_EXIT:
 82         return 0;
 83     case SOCKET_DATA:
 84         forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
 85         break;
 86     case SOCKET_CLOSE:
 87         forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
 88         break;
 89     case SOCKET_OPEN:
 90         forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
 91         break;
        //more code...
108     if (more) {
109         return -1;
110     }   
111     return 1;
112 }

  6 #define SOCKET_DATA 0
  7 #define SOCKET_CLOSE 1
  8 #define SOCKET_OPEN 2
  9 #define SOCKET_ACCEPT 3
 10 #define SOCKET_ERR 4
 11 #define SOCKET_EXIT 5
 12 #define SOCKET_UDP 6
 13 #define SOCKET_WARNING 7


 33 static void
 34 forward_message(int type, bool padding, struct socket_message * result) {
 35     struct skynet_socket_message *sm;
 36     size_t sz = sizeof(*sm);
 48     sm = (struct skynet_socket_message *)skynet_malloc(sz);
 49     sm->type = type;
 50     sm->id = result->id;
 51     sm->ud = result->ud;
 56     sm->buffer = result->data;
 59     struct skynet_message message;
 60     message.source = 0;
 61     message.session = 0;
 62     message.data = sm;
 63     message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
 65     if (skynet_context_push((uint32_t)result->opaque, &message)) {
            //free buffer
 70     }   
 71 }   //more code...

 17 struct socket_message {
 18     int id;
 19     uintptr_t opaque;
 20     int ud; // for accept, ud is new connection id ; for data, ud is size of data 
 21     char * data;
 22 };



1353 int
1354 socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
1355     for (;;) {
1356         if (ss->checkctrl) {
1357             if (has_cmd(ss)) {
1366             }
1367         }
1457 static void
1458 send_request(struct socket_server *ss, struct request_package *request, char type, int len) {
1459     request->header[6] = (uint8_t)type;
1460     request->header[7] = (uint8_t)len;
1461     for (;;) {
1462         ssize_t n = write(ss->sendctrl_fd, &request->header[6], len+2);  //写命令字到管道中,然后fd[0]可读,select返回可读事件
1463         if (n<0) {
1464             if (errno != EINTR) {
1466             }
1467             continue;
1468         }
1470         return;
1471     }
1472 }

比如,在lua层,listen一个ip/port的时候,并传backlog大小,会调用llisten接口,然后调用skynet_socket_listen(ctx, host,port,backlog);接着调用socket_server_listen(SOCKET_SERVER, source, host, port, backlog);

1676 int 
1677 socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) {
1678     int fd = do_listen(addr, port, backlog);
1679     if (fd < 0) {
1680         return -1;
1681     }
1682     struct request_package request;
1683     int id = reserve_id(ss);
1684     if (id < 0) {
1685         close(fd); 
1686         return id;
1687     }
1688     request.u.listen.opaque = opaque;  //句柄
1689     request.u.listen.id = id;
1690     request.u.listen.fd = fd;
1691     send_request(ss, &request, 'L', sizeof(request.u.listen));
1692     return id;
1693 } 


1092 // return type
1093 static int
1094 ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
1095     int fd = ss->recvctrl_fd;
1096     // the length of message is one byte, so 256+8 buffer size is enough.
1097     uint8_t buffer[256];
1098     uint8_t header[2];
1099     block_readpipe(fd, header, sizeof(header));
1100     int type = header[0];
1101     int len = header[1];
1102     block_readpipe(fd, buffer, len);
1103     // ctrl command only exist in local fd, so don't worry about endian.
1104     switch (type) {
1105     case 'S':
1106         return start_socket(ss,(struct request_start *)buffer, result);
1107     case 'B':
1108         return bind_socket(ss,(struct request_bind *)buffer, result);
1109     case 'L':
1110         return listen_socket(ss,(struct request_listen *)buffer, result);
         //more code...
 867 static int
 868 listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message      *result) {
 869     int id = request->id;
 870     int listen_fd = request->fd;
 871     struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false); //初始化监听套接字的相关数据
 872     if (s == NULL) {
 873         goto _failed;
 874     }
 875     s->type = SOCKET_TYPE_PLISTEN;
 876     return -1;
 877 _failed:
         //do failed logic...
 886 }

上面还没有监听listen socket,是在start_socket接口中完成的:

 940 static int
 941 start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *re     sult) {
 942     int id = request->id;
 943     result->id = id;
 944     result->opaque = request->opaque;
 945     result->ud = 0;
 946     result->data = NULL;
 947     struct socket *s = &ss->slot[HASH_ID(id)];
 948     if (s->type == SOCKET_TYPE_INVALID || s->id !=id) {
 949         result->data = "invalid socket";
 950         return SOCKET_ERR;
 951     }
 952     struct socket_lock l;
 953     socket_lock_init(s, &l);
 954     if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) {
 955         if (sp_add(ss->event_fd, s->fd, s)) {
 956             force_close(ss, s, &l, result);
 957             result->data = strerror(errno);
 958             return SOCKET_ERR;
 959         }
 960         s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN     ;
 961         s->opaque = request->opaque;
 962         result->data = "start";
 963         return SOCKET_OPEN;



 52 static int   
 53 sp_wait(int efd, struct event *e, int max) {
 54     struct epoll_event ev[max];
 55     int n = epoll_wait(efd , ev, max, -1);
 56     int i;
 57     for (i=0;i<n;i++) {
 58         e[i].s = ev[i].data.ptr;
 59         unsigned flag = ev[i].events;
 60         e[i].write = (flag & EPOLLOUT) != 0;
 61         e[i].read = (flag & (EPOLLIN | EPOLLHUP)) != 0;
 62         e[i].error = (flag & EPOLLERR) != 0;
 63     }
 65     return n;
 66 }

1368         if (ss->event_index == ss->event_n) {
1369             ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
1370             ss->checkctrl = 1;
1371             if (more) { 
1372                 *more = 0;
1373             }
1374             ss->event_index = 0;
1375             if (ss->event_n <= 0) {
1376                 ss->event_n = 0;
1377                 if (errno == EINTR) { 
1378                     continue;
1379                 } 
1380                 return -1;
1381             }
1382         }

这里把事件都拷贝到了struct event *e上面,在外层for循环处理;当if (ss->event_index == ss->event_n)成立时表示本次sp_wait返回的事件集都处理完毕;

1383         struct event *e = &ss->ev[ss->event_index++];
1384         struct socket *s = e->s;
1385         if (s == NULL) {
1386             // dispatch pipe message at beginning
1387             continue;
1388         }
1389         struct socket_lock l;
1390         socket_lock_init(s, &l);
1391         switch (s->type) {
1392         case SOCKET_TYPE_CONNECTING: //已连接的会在注册可写事件前,先判断有没有更多的数据要发送
1393             return report_connect(ss, s, &l, result);
1394         case SOCKET_TYPE_LISTEN: {
1395             int ok = report_accept(ss, s, result);
1396             if (ok > 0) {
1397                 return SOCKET_ACCEPT;
1398             } if (ok < 0 ) {
1399                 return SOCKET_ERR;
1400             }
1401             // when ok == 0, retry
1402             break;
1403         }

 675 static inline int
 676 send_buffer_empty(struct socket *s) {
 677     return (s->high.head == NULL && s->low.head == NULL);
 678 } //如果高和低优先级发送列表都为空

 888 static inline int
 889 nomore_sending_data(struct socket *s) {
 890     return send_buffer_empty(s) && s->dw_buffer == NULL && (s->sending & 0xffff) == 0;
 891 }

然后根据创建socket fd时的状态,比如是监听套接字,还是默认可读写等,这里举例有连接到来:

1289 static int
1290 report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) {
1291     union sockaddr_all u;
1292     socklen_t len = sizeof(u);
1293     int client_fd = accept(s->fd, &u.s, &len);
1294     if (client_fd < 0) {
1295          //do failed logic...
1304     }
1305     int id = reserve_id(ss);
1306     if (id < 0) {
1307         close(client_fd);
1308         return 0;
1309     }
1310     socket_keepalive(client_fd);
1311     sp_nonblocking(client_fd); //设置非阻塞
1312     struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);
1313     if (ns == NULL) {
1314         close(client_fd);
1315         return 0;
1316     }
1317     ns->type = SOCKET_TYPE_PACCEPT;
1318     result->opaque = s->opaque;  //服务句柄
1319     result->id = s->id;   //索引id
1320     result->ud = id;
1321     result->data = NULL;
1322     //more code...

以上是accept连接,然后分配struct socket存储结构,设置非阻塞,初始化数据,由gate服务处理;然后由lua层的业务逻辑调用start_socket来决定对应套接字是否要监听读写网络事件等。

1407         default:
1408             if (e->read) {
1409                 int type;
1410                 if (s->protocol == PROTOCOL_TCP) {
1411                     type = forward_message_tcp(ss, s, &l, result);
1412                 } else {
1413                     type = forward_message_udp(ss, s, &l, result);
1414                     if (type == SOCKET_UDP) {
1415                         // try read again
1416                         --ss->event_index;
1417                         return SOCKET_UDP;
1418                     }
1419                 }
1420                 if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
1421                     // Try to dispatch write message next step if write flag set.
1422                     e->read = false;
1423                     --ss->event_index;//这里防止写事件饿死,故这次读事件处理一次后让出给同一个event_index的写事件
1424                 }
1425                 if (type == -1)
1426                     break;
1427                 return type;
1428             }
1429             if (e->write) {
1430                 int type = send_buffer(ss, s, &l, result);
1431                 if (type == -1)
1432                     break;
1433                 return type;
1434             }

以上大概分析下,如果是可读,根据协议类型,这里说明tcp的情况,把读到的消息塞到struct socket_message中:

1189     result->opaque = s->opaque;
1190     result->id = s->id;
1191     result->ud = n;   //buffer长度
1192     result->data = buffer;



 734     if (s->dw_buffer) {
 735         // add direct write buffer before high.head
 736         struct write_buffer * buf = MALLOC(SIZEOF_TCPBUFFER);
 737         struct send_object so;
 738         buf->userobject = send_object_init(ss, &so, (void *)s->dw_buffer, s->dw_size);
 739         buf->ptr = (char*)so.buffer+s->dw_offset;
 740         buf->sz = so.sz - s->dw_offset;
 741         buf->buffer = (void *)s->dw_buffer;
 742         s->wb_size+=buf->sz;
 743         if (s->high.head == NULL) {
 744             s->high.head = s->high.tail = buf;
 745             buf->next = NULL;
 746         } else {
 747             buf->next = s->high.head;
 748             s->high.head = buf;
 749         } //在high头插入未完成的write_buffer
 750         s->dw_buffer = NULL;
 751     }
 752     int r = send_buffer_(ss,s,l,result);

680 /*
681 Each socket has two write buffer list, high priority and low priority.
683 1. send high list as far as possible.
684 2. If high list is empty, try to send low list.
685 3. If low list head is uncomplete (send a part before), move the head of low list to empty high list (call raise_uncomplete) .
686 4. If two lists are both empty, turn off the event. (call check_close)
687 */


这里举例下如何在主场景服务向client发消息,代理client的为agent,先在lua层调用skynet.send后,把要发送的消息压入agent服务队列,工作线程处理到agent服务队列时,把消息传到lua层,agent的服务做了一些逻辑,再socket.wirte到对应的skynet_socket_send,并由网络模块发送。lua层使用的fd和c层的socket fd不是同一个,而是c的层分配的索引id。





