- fork进程
- IO复用
- 线程
多进程
主进程监听,在循环中接受连接请求,当连接建立后,fork一个子进程,在子进程中进行处理。
主进程:listen -> while(1) -> accept -> fork -> close connfd
子进程:close listenfd -> handle -> close connfd
server:
[root@linuxkit-00155ddc0103 inet]# ./echoserverfork 6667
Parent: waitting...
name[localhost] port[38276]connected! Child No.1 will handle it!
Parent: waitting...
server recived 6 bytes
name[localhost] port[38280]connected! Child No.2 will handle it!
Parent: waitting...
server recived 7 bytes
client1:
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
first
first
client2:
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
second
second
code
#include "mynet.h"
int open_listenfd(char *port);
void echo(int connfd);
void sigchld_handler(int sig)
{
while(waitpid(-1,0,WNOHANG) >0)
;
return;
}
int main(int argc, char **argv)
{
int listenfd, connfd, cno=1;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
char client_hostname[MAXLINE], client_port[MAXLINE];
if(argc!=2){
fprintf(stderr,"usage: %s <port>\n",argv[0]);
exit(0);
}
signal(SIGCHLD, sigchld_handler);
listenfd = open_listenfd(argv[1]);
while(1){
printf("Parent: waitting...\n");
clientlen = sizeof(struct sockaddr_storage);
connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
printf("name[%s] port[%s]connected! Child No.%d will handle it!\n",
client_hostname,client_port,cno++);
if(fork()==0) //child
{
close(listenfd);
echo(connfd);
close(connfd);
exit(0);
}
close(connfd);
}
exit(0);
}
void echo(int connfd)
{
size_t n;
char buf[MAXLINE];
while((n=read(connfd,buf,MAXLINE))!=0){
printf("server recived %d bytes\n",(int)n);
write(connfd,buf,n);
}
}
© 2019 GitHub, Inc.
IO复用
IO复用即使用select函数
#include <sys/select.h>
int select(int n, fd_set *fdset, NULL, NULL, NULL) //等待一组描述符准备好读
FD_ZERO(fd_set *fdset);
FD_CLR(int fd, fd_set *fdset);
FD_SET(int fd, fd_set *fdset);
FD_ISSET(int fd, fd_set *fdset);
stdio与connfd的复用
如果使用echoserver迭代版本的同时,要求能够响应服务器本身的stdin输入,则可以使用select。一个标准输入的fd一般是0,监听fd则是系统分配
- 将stdin和llistenfd加入fdset
- 使用select等待是否有fd准备好
- 通过FD_ISSET查看哪个准备好并进行相应的处理
#include "mynet.h"
int open_listenfd(char *port);
void echo(int connfd);
void command(void);
int main(int argc, char **argv)
{
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
char client_hostname[MAXLINE], client_port[MAXLINE];
fd_set read_set, ready_set;
if(argc!=2){
fprintf(stderr,"usage: %s <port>\n",argv[0]);
exit(0);
}
listenfd = open_listenfd(argv[1]);
FD_ZERO(&read_set); //clear read_set
FD_SET(STDIN_FILENO, &read_set); //add stdin to read_set
FD_SET(listenfd,&read_set); //add listenfd to read_set
while(1){
ready_set = read_set;
select(listenfd+1,&ready_set,NULL,NULL,NULL);
if(FD_ISSET(STDIN_FILENO,&ready_set))
{
printf("stdin is ready.\n");
command();
}
if(FD_ISSET(listenfd,&ready_set)){
printf("client connetted.\n");
clientlen = sizeof(struct sockaddr_storage);
connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
echo(connfd);
close(connfd);
}
}
exit(0);
}
void command(void){
char buf[MAXLINE];
if(!fgets(buf,MAXLINE,stdin)){
exit(0); //EOF
}
printf("%s",buf);
}
void echo(int connfd)
{
size_t n;
char buf[MAXLINE];
while((n=read(connfd,buf,MAXLINE))!=0){
printf("server recived %d bytes\n",(int)n);
write(connfd,buf,n);
}
}
server behavior:
[root@linuxkit-00155ddc0103 inet]# ./select 6667
do something
stdin is ready.
do something
hello
stdin is ready.
hello
client connetted.
server recived 12 bytes
ccd //no response here becuse client is connecting
stdin is ready. //client close now
ccd
f
stdin is ready.
f
stdin is ready. //ctrl+D here
client behavior:
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
i am client
i am client
基于I/O多路复用的并发事件驱动器
与之前类似,维护一个结构体pool,存储readset,readyset和clientfd
- server监听,初始化pool
- 进入循环,初始化readyset,select阻塞,等待fd准备好
- 若listenfd,添加client
- 对所有准备好的clientfd回射服务
server:
[root@linuxkit-00155ddc0103 inet]# ./echoserverselect 6667
localhost:38334 connected!
Server recived 5 (5 total) bytes on fd 4
Server recived 4 (9 total) bytes on fd 4
Server recived 7 (16 total) bytes on fd 4
localhost:38338 connected!
Server recived 4 (20 total) bytes on fd 5
Server recived 6 (26 total) bytes on fd 5
Server recived 2 (28 total) bytes on fd 5
Close connfd 4
Server recived 7 (35 total) bytes on fd 5
Close connfd 5
client1:
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
1234
1234
123
123
123456
123456
[root@linuxkit-00155ddc0103 inet]#
client2:
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
123
123
12345
12345
1
1
123456
123456
[root@linuxkit-00155ddc0103 inet]#
code
#include "mynet.h"
typedef struct{
int maxfd;
fd_set read_set;
fd_set ready_set;
int nready;
int maxi;
int clientfd[FD_SETSIZE];
}pool;
void add_client(int connfd, pool *p);
void check_clients(pool *p);
void init_pool(int listenfd, pool *p);
int open_listenfd(char *port);
int byte_cnt = 0;
int main(int argc, char **argv)
{
int listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
char client_hostname[MAXLINE], client_port[MAXLINE];
static pool pool;
if(argc!=2){
fprintf(stderr,"usage: %s <port>\n",argv[0]);
exit(0);
}
listenfd = open_listenfd(argv[1]);
init_pool(listenfd,&pool);
while(1){
pool.ready_set = pool.read_set;
pool.nready = select(pool.maxfd+1,&pool.ready_set,NULL,NULL,NULL);
if(FD_ISSET(listenfd,&pool.ready_set))
{
clientlen = sizeof(struct sockaddr_storage);
connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
printf("%s:%s connected!\n",client_hostname,client_port);
add_client(connfd, &pool);
}
check_clients(&pool);
}
exit(0);
}
void init_pool(int listenfd, pool *p){
int i;
p->maxi = -1;
for(i=0;i<FD_SETSIZE;i++)
p->clientfd[i] = -1;
p->maxfd = listenfd;
FD_ZERO(&p -> read_set);
FD_SET(listenfd, &p->read_set);
}
void add_client(int connfd, pool *p){
int i;
p->nready--;
for(i=0;i<FD_SETSIZE;i++){
if(p->clientfd[i]<0){
p->clientfd[i] = connfd;
FD_SET(connfd,&p->read_set);
if(connfd,&p->maxfd)
p->maxfd = connfd;
if(i>p->maxi)
p->maxi = i;
break;
}
if(i == FD_SETSIZE)
printf("Add client error! Beyond max client numbers.\n");
}
}
void check_clients(pool *p)
{
int i,connfd,n;
char buf[MAXLINE];
char echostr[MAXLINE+10];
for(i=0;(i<=p->maxi)&&(p->nready>0);i++){
connfd = p->clientfd[i];
sprintf(buf,"\0");
if((connfd>0)&&(FD_ISSET(connfd,&p->ready_set))){
p->nready--;
if((n=read(connfd,buf,MAXLINE))!=0){
byte_cnt+=n;
printf("Server recived %d (%d total) bytes on fd %d\n",n,byte_cnt,connfd);
write(connfd,buf,n);
}
else{
printf("Close connfd %d\n",connfd);
close(connfd);
FD_CLR(connfd,&p->read_set);
p->clientfd[i] = -1;
}
}
}
}
多线程
server:
[root@linuxkit-00155ddc0103 inet]# ./echoserverthread 6667
name[localhost] port[38362]connected! Thread id:140562122241792 will serve it!
name[localhost] port[38366]connected! Thread id:140562113849088 will serve it!
server recived 6 bytes from fd 4
server recived 4 bytes from fd 4
server recived 7 bytes from fd 5
server recived 4 bytes from fd 5
client1:
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
12345
12345
123
123
client2:
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
654321
654321
321
321
注意传递connfd时是新申请空间,并在thread里释放
code
#include "mynet.h"
int open_listenfd(char *port);
void *thread(void *vargp);
void echo(int connfd);
int main(int argc, char **argv)
{
int listenfd, *connfdp, cno=1;
socklen_t clientlen;
pthread_t tid;
struct sockaddr_storage clientaddr;
char client_hostname[MAXLINE], client_port[MAXLINE];
if(argc!=2){
fprintf(stderr,"usage: %s <port>\n",argv[0]);
exit(0);
}
listenfd = open_listenfd(argv[1]);
while(1){
clientlen = sizeof(struct sockaddr_storage);
connfdp = malloc(sizeof(int));
*connfdp = accept(listenfd, (SA*)&clientaddr,&clientlen);
getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
pthread_create(&tid,NULL,thread,connfdp);
printf("name[%s] port[%s]connected! Thread id:%ld will serve it!\n",
client_hostname,client_port,(unsigned long long)tid);
}
exit(0);
}
void *thread(void* vargp){
int connfd = *((int *)vargp);
pthread_detach(pthread_self());
free(vargp);
echo(connfd);
close(connfd);
return NULL;
}
void echo(int connfd)
{
size_t n;
char buf[MAXLINE];
while((n=read(connfd,buf,MAXLINE))!=0){
printf("server recived %d bytes from fd %d\n",(int)n,connfd);
write(connfd,buf,n);
}
}
基于线程的事件驱动程序
也称作预线程化的方法
- 主线程将连接符connfd放入池中
- 工作线程将connfd取出并处理
- 加入线程和取出线程都使用互斥量保护
- echo程序中的cnt也使用互斥量保护
server
[root@linuxkit-00155ddc0103 inet]# ./echoserver_pre_thread 6667
client[localhost:38376] connected! connfd 4!
server recived 7 (7 total) bytes on fd 4
client[localhost:38380] connected! connfd 5!
server recived 9 (16 total) bytes on fd 5
server recived 4 (20 total) bytes on fd 4
client1
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
123456
123456
123
123
client2
[root@linuxkit-00155ddc0103 inet]# ./echoclient localhost 6667
87654321
87654321
sbuf.c fd池,插入,取出操作
#include "sbuf.h"
void sbuf_init(sbuf_t *sp, int n){
sp->buf = calloc(n, sizeof(int));
sp->n = n;
sp->front = sp->rear = 0;
sem_init(&sp->mutex,0,1);
sem_init(&sp->slots,0,n);
sem_init(&sp->items,0,0);
}
void snuf_deinit(sbuf_t *sp){
free(sp->buf);
}
void sbuf_insert(sbuf_t *sp, int item){
sem_wait(&sp->slots);
sem_wait(&sp->mutex);
sp->buf[(++sp->rear)%(sp->n)] = item;
sem_post(&sp->mutex);
sem_post(&sp->items);
}
int sbuf_remove(sbuf_t *sp){
int item;
sem_wait(&sp->items);
sem_wait(&sp->mutex);
item = sp->buf[(++sp->front)%(sp->n)];
sem_post(&sp->mutex);
sem_post(&sp->slots);
return item;
}
sbuf结构
#ifndef SBUF_H_
#define SBUF_H_
#include "mynet.h"
#include <semaphore.h>
typedef struct {
int *buf;
int n;
int front;
int rear;
sem_t mutex;
sem_t slots;
sem_t items;
}sbuf_t;
void dbuf_init(sbuf_t *sp, int n);
void sbuf_deinit(sbuf_t *p);
void sbuf_insert(sbuf_t *sp, int item);
int sbuf_remove(sbuf_t *sp);
#endif
echo_cnt.c
#include "echo_cnt.h"
static int byte_cnt;
static sem_t mutex;
static void init_echo_cnt(void){
sem_init(&mutex,0,1);
byte_cnt = 0;
}
void echo_cnt(int connfd){
int n;
char buf[MAXLINE];
static pthread_once_t once = PTHREAD_ONCE_INIT;
pthread_once(&once, init_echo_cnt);
while((n=read(connfd,buf,MAXLINE))!=0){
sem_wait(&mutex);
byte_cnt += n;
printf("server recived %d (%d total) bytes on fd %d\n",
n,byte_cnt,connfd);
sem_post(&mutex);
write(connfd,buf,n);
}
}
主程序
#include "mynet.h"
#include "sbuf.h"
#include "echo_cnt.h"
#define NTHREADS 4
#define SBUFSIZE 16
int open_listenfd(char *port);
void *thread(void *vargp);
sbuf_t sbuf;
int main(int argc, char **argv)
{
int listenfd, connfd, i;
socklen_t clientlen;
pthread_t tid;
struct sockaddr_storage clientaddr;
char client_hostname[MAXLINE], client_port[MAXLINE];
if(argc!=2){
fprintf(stderr,"usage: %s <port>\n",argv[0]);
exit(0);
}
listenfd = open_listenfd(argv[1]);
sbuf_init(&sbuf,SBUFSIZE);
for(i = 0;i<NTHREADS;i++){
pthread_create(&tid,NULL,thread,NULL);
}
while(1){
clientlen = sizeof(struct sockaddr_storage);
connfd = accept(listenfd, (SA*)&clientaddr,&clientlen);
getnameinfo((SA*)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
sbuf_insert(&sbuf,connfd);
printf("client[%s:%s] connected! connfd %d!\n",
client_hostname,client_port,(int)connfd);
}
exit(0);
}
void *thread(void* vargp){
pthread_detach(pthread_self());
while(1){
int connfd = sbuf_remove(&sbuf);
echo_cnt(connfd);
close(connfd);
}
return NULL;
}