前言
Socket在實際系統(tǒng)程序開發(fā)當中,應(yīng)用非常廣泛,也非常重要。實際應(yīng)用中服務(wù)器經(jīng)常需要支持多個客戶端連接,實現(xiàn)高并發(fā)服務(wù)器模型顯得尤為重要。高并發(fā)服務(wù)器從簡單的循環(huán)服務(wù)器模型處理少量網(wǎng)絡(luò)并發(fā)請求,演進到解決C10K,C10M問題的高并發(fā)服務(wù)器模型。本文通過一個簡單的多線程模型,帶領(lǐng)大家學(xué)習(xí)如何自己實現(xiàn)一個簡單的并發(fā)服務(wù)器。
C/S架構(gòu)
服務(wù)器-客戶機,即Client-Server(C/S)結(jié)構(gòu)。C/S結(jié)構(gòu)通常采取兩層結(jié)構(gòu)。服務(wù)器負責數(shù)據(jù)的管理,客戶機負責完成與用戶的交互任務(wù)。
在C/S結(jié)構(gòu)中,應(yīng)用程序分為兩部分:服務(wù)器部分和客戶機部分。服務(wù)器部分是多個用戶共享的信息與功能,執(zhí)行后臺服務(wù),如控制共享數(shù)據(jù)庫的操作等;客戶機部分為用戶所專有,負責執(zhí)行前臺功能,在出錯提示、在線幫助等方面都有強大的功能,并且可以在子程序間自由切換。
如上圖所示:這是基于套接字實現(xiàn)客戶端和服務(wù)器相連的函數(shù)調(diào)用關(guān)系,socket API資料比較多,本文不再過多敘述。
pthread線程庫:(POSIX)
pthread線程庫是Linux下比較常用的一個線程庫,關(guān)于他的用法和特性大家可以自行搜索相關(guān)文章,下面只簡單介紹他的用法和編譯。
線程標識
線程有ID, 但不是系統(tǒng)唯一, 而是進程環(huán)境中唯一有效.線程的句柄是pthread_t類型, 該類型不能作為整數(shù)處理, 而是一個結(jié)構(gòu).下面介紹兩個函數(shù):
頭文件:?<pthread.h>
原型:?int?pthread_equal(pthread_t?tid1,?pthread_t?tid2);
返回值:?相等返回非0,?不相等返回0.
說明:?比較兩個線程ID是否相等.
頭文件:?<pthread.h>
原型:?pthread_t?pthread_self();
返回值:?返回調(diào)用線程的線程ID.
線程創(chuàng)建
在執(zhí)行中創(chuàng)建一個線程, 可以為該線程分配它需要做的工作(線程執(zhí)行函數(shù)), 該線程共享進程的資源. 創(chuàng)建線程的函數(shù)pthread_create()
頭文件:?<pthread.h>
原型:?int?pthread_create(pthread_t?*restrict?tidp,?const?pthread_attr_t?*restrict?attr,?void?*(start_rtn)(void),?void?*restrict?arg);
返回值:?成功則返回0,?否則返回錯誤編號.
參數(shù):
tidp:?指向新創(chuàng)建線程ID的變量,?作為函數(shù)的輸出.
attr:?用于定制各種不同的線程屬性,?NULL為默認屬性(見下).
start_rtn:?函數(shù)指針,?為線程開始執(zhí)行的函數(shù)名.該函數(shù)可以返回一個void?*類型的返回值,
而這個返回值也可以是其他類型,并由?pthread_join()獲取
arg:?函數(shù)的唯一無類型(void)指針參數(shù),?如要傳多個參數(shù),?可以用結(jié)構(gòu)封裝.
編譯
因為pthread的庫不是linux系統(tǒng)的庫,所以在進行編譯的時候要加上?????-lpthread
#?gcc?filename?-lpthread??//默認情況下gcc使用c庫,要使用額外的庫要這樣選擇使用的庫
常見的網(wǎng)絡(luò)服務(wù)器模型
本文結(jié)合自己的理解,主要以TCP為例,總結(jié)了幾種常見的網(wǎng)絡(luò)服務(wù)器模型的實現(xiàn)方式,并最終實現(xiàn)一個簡單的命令行聊天室。
單進程循環(huán)
單線進程循環(huán)原理就是主進程沒和客戶端通信,客戶端都要先連接服務(wù)器,服務(wù)器接受一個客戶端連接后從客戶端讀取數(shù)據(jù),然后處理并將處理的結(jié)果返還給客戶端,然后再接受下一個客戶端的連接請求。
優(yōu)點
單線程循環(huán)模型優(yōu)點是簡單、易于實現(xiàn),沒有同步、加鎖這些麻煩事,也沒有這些開銷。
缺點
- 阻塞模型,網(wǎng)絡(luò)請求串行處理;
- 沒有利用多核cpu的優(yōu)勢,網(wǎng)絡(luò)請求串行處理;
- 無法支持同時多個客戶端連接;
- 程序串行操作,服務(wù)器無法實現(xiàn)同時收發(fā)數(shù)據(jù)。
單線程IO復(fù)用
linux高并發(fā)服務(wù)器中常用epoll作為IO復(fù)用機制。線程將需要處理的socket讀寫事件都注冊到epoll中,當有網(wǎng)絡(luò)IO發(fā)生時,epoll_wait返回,線程檢查并處理到來socket上的請求。
優(yōu)點
- 實現(xiàn)簡單, 減少鎖開銷,減少線程切換開銷。
缺點
- 只能使用單核cpu,handle時間過長會導(dǎo)致整個服務(wù)掛死;
- 當有客戶端數(shù)量超過一定數(shù)量后,性能會顯著下降;
- 只適用高IO、低計算,handle處理時間短的場景。
多線程/多進程
多線程、多進程模型主要特點是每個網(wǎng)絡(luò)請求由一個進程/線程處理,線程內(nèi)部使用阻塞式系統(tǒng)調(diào)用,在線程的職能劃分上,可以由一個單獨的線程處理accept連接,其余線程處理具體的網(wǎng)絡(luò)請求(收包,處理,發(fā)包);還可以多個進程單獨listen、accept網(wǎng)絡(luò)連接。
優(yōu)點:
1、實現(xiàn)相對簡單;
2、利用到CPU多核資源。
缺點:
1、線程內(nèi)部還是阻塞的,舉個極端的例子,如果一個線程在handle的業(yè)務(wù)邏輯中sleep了,這個線程也就掛住了。
多線程/多進程IO復(fù)用
多線程、多進程IO服用模型,每個子進程都監(jiān)聽服務(wù),并且都使用epoll機制來處理進程的網(wǎng)絡(luò)請求,子進程 accept() 后將創(chuàng)建已連接描述符,然后通過已連接描述符來與客戶端通信。該機制適用于高并發(fā)的場景。
優(yōu)點:
- 支撐較高并發(fā)。
缺點:
- 異步編程不直觀、容易出錯
多線程劃分IO角色
多線程劃分IO角色主要功能有:一個accept thread處理新連接建立;一個IO thread pool處理網(wǎng)絡(luò)IO;一個handle thread pool處理業(yè)務(wù)邏輯。使用場景如:電銷應(yīng)用,thrift TThreadedSelectorServer。
優(yōu)點:
- 按不同功能劃分線程,各線程處理固定功能,效率更高
- 可以根據(jù)業(yè)務(wù)特點配置線程數(shù)量來性能調(diào)優(yōu)
缺點:
- 線程間通信需要引入鎖開銷
- 邏輯較復(fù)雜,實現(xiàn)難度大
小結(jié)
上面介紹了常見的網(wǎng)絡(luò)服務(wù)器模型,還有AIO、協(xié)程,甚至還有其他的變型,在這里不再討論。重要的是理解每種場景中所面臨的問題和每種模型的特點,設(shè)計出符合應(yīng)用場景的方案才是好方案。
多線程并發(fā)服務(wù)器模型
下面我們主要討論多線程并發(fā)服務(wù)器模型。
代碼結(jié)構(gòu)
并發(fā)服務(wù)器代碼結(jié)構(gòu)如下:
thread_func()
{
while(1)?{
recv(...);
process(...);
send(...);
}
close(...);
}
main(
socket(...);
bind(...);
listen(...);
while(1)?{
accept(...);
pthread_create();
}
}
由上可以看出,服務(wù)器分為兩部分:主線程、子線程。
主線程
main函數(shù)即主線程,它的主要任務(wù)如下:
- socket()創(chuàng)建監(jiān)聽套字;
- bind()綁定端口號和地址;
- listen()開啟監(jiān)聽;
- accept()等待客戶端的連接,
- 當有客戶端連接時,accept()會創(chuàng)建一個新的套接字new_fd;
- 主線程會創(chuàng)建子線程,并將new_fd傳遞給子線程。
子線程
- 子線程函數(shù)為thread_func(),他通過new_fd處理和客戶端所有的通信任務(wù)。
客戶端連接服務(wù)器詳細步驟
下面我們分步驟來看客戶端連接服務(wù)器的分步說明。
1. 客戶端連接服務(wù)器
- 服務(wù)器建立起監(jiān)聽套接字listen_fd,并初始化;
- 客戶端創(chuàng)建套接字fd1;
- 客戶端client1通過套接字fd1連接服務(wù)器的listen_fd;
2. 主線程創(chuàng)建子線程thread1
- server收到client1的連接請求后,accpet函數(shù)會返回一個新的套接字newfd1;
- 后面server與client1的通信就依賴newfd1,監(jiān)聽套接字listen_fd會繼續(xù)監(jiān)聽其他客戶端的連接;
- 主線程通過pthead_create()創(chuàng)建一個子線程thread1,并把newfd1傳遞給thread1;
- server與client1的通信就分別依賴newfd1、fd1。
- client1為了能夠?qū)崟r收到server發(fā)送的信息,同時還要能夠從鍵盤上讀取數(shù)據(jù),這兩個操作都是阻塞的,沒有數(shù)據(jù)的時候進程會休眠,所以必須創(chuàng)建子線程read_thread;
- client1的主線負責從鍵盤上讀取數(shù)據(jù)并發(fā)送給,子線程read_thread負責從server接受信息。
3. client2連接服務(wù)器
- 客戶端client2創(chuàng)建套接字fd2;
- 通過connect函數(shù)連接server的listen_fd;
4. 主線程創(chuàng)建子線程thread2
- server收到client2的連接請求后,accpet函數(shù)會返回一個新的套接字newfd2;
- 后面server與client2的通信就依賴newfd2,監(jiān)聽套接字listen_fd會繼續(xù)監(jiān)聽其他客戶端的連接;
- 主線程通過pthead_create()創(chuàng)建一個子線程thread2,并把newfd2傳遞給thread2;
- server與client1的通信就分別依賴newfd2、fd2。
- 同樣client2為了能夠?qū)崟r收到server發(fā)送的信息,同時還要能夠從鍵盤上讀取數(shù)據(jù)必須創(chuàng)建子線程read_thread;
- client1的主線負責從鍵盤上讀取數(shù)據(jù)并發(fā)送給,子線程read_thread負責從server接受信息。
由上圖可見,每一個客戶端連接server后,server都要創(chuàng)建一個專門的thread負責和該客戶端的通信;每一個客戶端和server都有一對固定的fd組合用于連接。
實例
好了,理論講完了,根據(jù)一口君的慣例,也繼承祖師爺?shù)慕陶d:talk is cheap,show you my code.不上代碼,只寫理論的文章都是在耍流氓。
本例的主要功能描述如下:
- 實現(xiàn)多個客戶端可以同時連接服務(wù)器;
- 客戶端可以實現(xiàn)獨立的收發(fā)數(shù)據(jù);
- 客戶端發(fā)送數(shù)據(jù)給服務(wù)器后,服務(wù)器會將數(shù)據(jù)原封不動返回給客戶端。
服務(wù)器端
/*********************************************
服務(wù)器程序??TCPServer.c
公眾號:一口Linux
*********************************************/
#include?<stdio.h>
#include?<sys/types.h>
#include?<sys/socket.h>
#include?<arpa/inet.h>
#include?<errno.h>
#include?<string.h>
#include?<pthread.h>
#include?<stdlib.h>
#define?RECVBUFSIZE?2048
void?*rec_func(void?*arg)
{
int?sockfd,new_fd,nbytes;
char?buffer[RECVBUFSIZE];
int?i;
new_fd?=?*((int?*)?arg);
free(arg);
while(1)
{
if((nbytes=recv(new_fd,buffer,?RECVBUFSIZE,0))==-1)
{
fprintf(stderr,"Read?Error:%sn",strerror(errno));
exit(1);
}
if(nbytes?==?-1)
{//客戶端出錯了?返回值-1
close(new_fd);
break;
}
if(nbytes?==?0)
{//客戶端主動斷開連接,返回值是0
close(new_fd);
break;
}
buffer[nbytes]='';
printf("I?have?received:%sn",buffer);
if(send(new_fd,buffer,strlen(buffer),0)==-1)
{
fprintf(stderr,"Write?Error:%sn",strerror(errno));
exit(1);
}
}
}
int?main(int?argc,?char?*argv[])
{
char?buffer[RECVBUFSIZE];
int?sockfd,new_fd,nbytes;
struct?sockaddr_in?server_addr;
struct?sockaddr_in?client_addr;
int?sin_size,portnumber;
char?hello[]="Hello!?Socket?communication?world!n";
pthread_t?tid;
int?*pconnsocke?=?NULL;
int?ret,i;
if(argc!=2)
{
fprintf(stderr,"Usage:%s?portnumberan",argv[0]);
exit(1);
}
/*端口號不對,退出*/
if((portnumber=atoi(argv[1]))<0)
{
fprintf(stderr,"Usage:%s?portnumberan",argv[0]);
exit(1);
}
/*服務(wù)器端開始建立socket描述符??sockfd用于監(jiān)聽*/
if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1)
{
fprintf(stderr,"Socket?error:%sna",strerror(errno));
exit(1);
}
/*服務(wù)器端填充?sockaddr結(jié)構(gòu)*/
bzero(&server_addr,sizeof(struct?sockaddr_in));
server_addr.sin_family?????=AF_INET;
/*自動填充主機IP*/
server_addr.sin_addr.s_addr=htonl(INADDR_ANY);//自動獲取網(wǎng)卡地址
server_addr.sin_port???????=htons(portnumber);
/*捆綁sockfd描述符*/
if(bind(sockfd,(struct?sockaddr?*)(&server_addr),sizeof(struct?sockaddr))==-1)
{
fprintf(stderr,"Bind?error:%sna",strerror(errno));
exit(1);
}
/*監(jiān)聽sockfd描述符*/
if(listen(sockfd,?10)==-1)
{
fprintf(stderr,"Listen?error:%sna",strerror(errno));
exit(1);
}
while(1)
{
/*服務(wù)器阻塞,直到客戶程序建立連接*/
sin_size=sizeof(struct?sockaddr_in);
if((new_fd?=?accept(sockfd,(struct?sockaddr?*)&client_addr,&sin_size))==-1)
{
fprintf(stderr,"Accept?error:%sna",strerror(errno));
exit(1);
}
pconnsocke?=?(int?*)?malloc(sizeof(int));
*pconnsocke?=?new_fd;
ret?=?pthread_create(&tid,?NULL,?rec_func,?(void?*)?pconnsocke);
if?(ret?<?0)
{
perror("pthread_create?err");
return?-1;
}
}
//close(sockfd);
exit(0);
}
客戶端
/*********************************************
服務(wù)器程序??TCPServer.c
公眾號:一口Linux
*********************************************/
#include?<stdio.h>
#include?<sys/types.h>
#include?<sys/socket.h>
#include?<arpa/inet.h>
#include?<errno.h>
#include?<string.h>
#include?<pthread.h>
#include?<stdlib.h>
#define?RECVBUFSIZE?1024
void?*func(void?*arg)
{
int?sockfd,new_fd,nbytes;
char?buffer[RECVBUFSIZE];
new_fd?=?*((int?*)?arg);
free(arg);
while(1)
{
if((nbytes=recv(new_fd,buffer,?RECVBUFSIZE,0))==-1)
{
fprintf(stderr,"Read?Error:%sn",strerror(errno));
exit(1);
}
buffer[nbytes]='';
printf("I?have?received:%sn",buffer);
}
}
int?main(int?argc,?char?*argv[])
{
int?sockfd;
char?buffer[RECVBUFSIZE];
struct?sockaddr_in?server_addr;
struct?hostent?*host;
int?portnumber,nbytes;
pthread_t?tid;
int?*pconnsocke?=?NULL;
int?ret;
//檢測參數(shù)個數(shù)
if(argc!=3)
{
fprintf(stderr,"Usage:%s?hostname?portnumberan",argv[0]);
exit(1);
}
//argv2?存放的是端口號?,讀取該端口,轉(zhuǎn)換成整型變量
if((portnumber=atoi(argv[2]))<0)
{
fprintf(stderr,"Usage:%s?hostname?portnumberan",argv[0]);
exit(1);
}
//創(chuàng)建一個?套接子
if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1)
{
fprintf(stderr,"Socket?Error:%san",strerror(errno));
exit(1);
}
//填充結(jié)構(gòu)體,ip和port必須是服務(wù)器的
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(portnumber);
server_addr.sin_addr.s_addr?=?inet_addr(argv[1]);//argv【1】?是server?ip地址
/*?í?§3ìDò·¢?eá??ó???ó*/
if(connect(sockfd,(struct?sockaddr?*)(&server_addr),sizeof(struct?sockaddr))==-1)
{
fprintf(stderr,"Connect?Error:%san",strerror(errno));
exit(1);
}
//創(chuàng)建線程
pconnsocke?=?(int?*)?malloc(sizeof(int));
*pconnsocke?=?sockfd;
ret?=?pthread_create(&tid,?NULL,?func,?(void?*)?pconnsocke);
if?(ret?<?0)
{
perror("pthread_create?err");
return?-1;
}
while(1)
{
#if?1
printf("input?msg:");
scanf("%s",buffer);
if(send(sockfd,buffer,strlen(buffer),0)==-1)
{
fprintf(stderr,"Write?Error:%sn",strerror(errno));
exit(1);
}
#endif
}
close(sockfd);
exit(0);
}
編譯
編譯線程,需要用到pthread庫,編譯命令如下:
- gcc s.c -o s -lpthread
- gcc cli.c -o c -lpthread先本機測試
- 開啟一個終端 ./s 8888
- 再開一個終端 ./cl 127.0.0.1 8888,輸入一個字符串"qqqqqqq"
- 再開一個終端 ./cl 127.0.0.1 8888,輸入一個字符串"yikoulinux"
有讀者可能會注意到,server創(chuàng)建子線程的時候用的是以下代碼:
pconnsocke?=?(int?*)?malloc(sizeof(int));
*pconnsocke?=?new_fd;
ret?=?pthread_create(&tid,?NULL,?rec_func,?(void?*)?pconnsocke);
if?(ret?<?0)
{
perror("pthread_create?err");
return?-1;
}
為什么必須要malloc一塊內(nèi)存專門存放這個新的套接字呢?
這個是一個很隱蔽,很多新手都容易犯的錯誤。下一章,我會專門給大家講解。