說到linux下多進程通信,有好幾種,之前也在喵哥的公眾號回復(fù)過,這里再拿出來,重新寫一遍:多進程通信有管道,而管道分為匿名和命名管道 ,后者比前者優(yōu)勢在于可以進行無親緣進程通信;此外信號也是進程通信的一種,比如我們最常用的就是設(shè)置ctrl+c的kill信號發(fā)送給進程;其次信號量一般來說是一種同步機制但是也可以認(rèn)為是通信,需要注意的是信號量、共享內(nèi)存、消息隊列在使用時候也有posix和system v的區(qū)別;還有我們今天的主角套接字( socket ) :套接字也是一種進程間通信機制。
線程間的通信的話,共享變量,此外在unpipc書描述的話,同步也屬于通訊機制,那么就要補充上線程中我們最多用的互斥量、條件變量、讀寫鎖、記錄鎖和線程中的信號量使用。
今天想分享一些socket編程的例子,socket嵌入式。linux開發(fā)很常用,用于進程間通信很方便,也有很多介紹,今天我也也來做自己的介紹分享。和別人不一樣的地方,我主要想分享socket 服務(wù)端在linux寫的代碼,使用vscode調(diào)試執(zhí)行,并且同時分享自己使用tcp監(jiān)控軟件去判斷socket通信正確性。
歡迎關(guān)注微信公眾號:羽林君,或者添加作者個人微信:become_me
socket通信基本函數(shù)介紹
在這里有一個簡單demo演示以及函數(shù)的介紹,大家打開這個鏈接就可以看到哈:
socket重要函數(shù)
socket通信有些固定的函數(shù),這里先給大家做簡單的分享:
int socket(int domain, int type, int protocol);
該函數(shù)用于創(chuàng)建一個socket描述符,它唯一標(biāo)識一個socket,這個socket描述字跟文件描述字一樣,后續(xù)的操作都有用到它,把它作為參數(shù),通過它來進行一些讀寫操作。創(chuàng)建socket的時候,也可以指定不同的參數(shù)創(chuàng)建不同的socket描述符,socket函數(shù)的三個參數(shù)分別為:
1.domain:參數(shù)domain表示該套接字使用的協(xié)議族,在Linux系統(tǒng)中支持多種協(xié)議族,對于TCP/IP協(xié)議來說,選擇AF_INET就足以,當(dāng)然如果你的IP協(xié)議的版本支持IPv6,那么可以選擇AF_INET6,可選的協(xié)議族具體見:
- AF_UNIX, AF_LOCAL:?本地通信-AF_INET :IPv4
- AF_INET6 :IPv6
- AF_IPX :IPX - Novell 協(xié)議
- AF_NETLINK :?內(nèi)核用戶界面設(shè)備
- AF_X25 :ITU-T X.25 / ISO-8208 協(xié)議
- AF_AX25 :?業(yè)余無線電 AX.25 協(xié)議
- AF_ATMPVC :?訪問原始ATM PVC
- AF_APPLETALK :AppleTalk
- AF_PACKET :?底層數(shù)據(jù)包接口
- AF_ALG :?內(nèi)核加密API的AF_ALG接口
2.type:參數(shù)type指定了套接字使用的服務(wù)類型,可能的類型有以下幾種:
- SOCK_STREAM:提供可靠的(即能保證數(shù)據(jù)正確傳送到對方)面向連接的Socket服務(wù),多用于資料(如文件)傳輸,如TCP協(xié)議。
- SOCK_DGRAM:是提供無保障的面向消息的Socket 服務(wù),主要用于在網(wǎng)絡(luò)上發(fā)廣播信息,如UDP協(xié)議,提供無連接不可靠的數(shù)據(jù)報交付服務(wù)。
- SOCK_SEQPACKET:為固定最大長度的數(shù)據(jù)報提供有序的,可靠的,基于雙向連接的數(shù)據(jù)傳輸路徑。
- SOCK_RAW:表示原始套接字,它允許應(yīng)用程序訪問網(wǎng)絡(luò)層的原始數(shù)據(jù)包,這個套接字用得比較少,暫時不用理會它。
- SOCK_RDM:提供不保證排序的可靠數(shù)據(jù)報層。
3.protocol:參數(shù)protocol指定了套接字使用的協(xié)議,在IPv4中,只有TCP協(xié)議提供SOCK_STREAM這種可靠的服務(wù),只有UDP協(xié)議提供SOCK_DGRAM服務(wù),對于這兩種協(xié)議,protocol的值均為0,因為當(dāng)protocol為0時,會自動選擇type類型對應(yīng)的默認(rèn)協(xié)議。
int bind(int sockfd, struct sockaddr *my_addr, socklen_t addrlen);
在進行網(wǎng)絡(luò)通信的時候,必須把一個套接字與一個IP地址或端口號相關(guān)聯(lián),這個bind就是綁定的過程。
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
這個connect()函數(shù)用于客戶端中,將sockfd與遠端IP地址、端口號進行綁定,在TCP客戶端中調(diào)用這個函數(shù)將發(fā)生握手過程(會發(fā)送一個TCP連接請求),并最終建立一個TCP連接,而對于UDP協(xié)議來說,調(diào)用這個函數(shù)只是在sockfd中記錄遠端IP地址與端口號,而不發(fā)送任何數(shù)據(jù),參數(shù)信息與bind()函數(shù)是一樣的。
int listen(int s, int backlog);
listen()函數(shù)只能在TCP服務(wù)器進程中使用,讓服務(wù)器進程進入監(jiān)聽狀態(tài),等待客戶端的連接請求,listen()函數(shù)在一般在bind()函數(shù)之后調(diào)用,在accept()函數(shù)之前調(diào)用,它的函數(shù)原型是:
int accept(int s, struct sockaddr *addr, socklen_t *addrlen);
accept()函數(shù)就是用于處理連接請求的,accept()函數(shù)用于TCP服務(wù)器中,等待著遠端主機的連接請求,并且建立一個新的TCP連接,在調(diào)用這個函數(shù)之前需要通過調(diào)用listen()函數(shù)讓服務(wù)器進入監(jiān)聽狀態(tài),如果隊列中沒有未完成連接套接字,并且套接字沒有標(biāo)記為非阻塞模式,accept()函數(shù)的調(diào)用會阻塞應(yīng)用程序直至與遠程主機建立TCP連接;如果一個套接字被標(biāo)記為非阻塞式而隊列中沒有未完成連接套接字, 調(diào)用accept()函數(shù)將立即返回EAGAIN。
ssize_t read(int fd, void *buf, size_t count);
read 從描述符 fd 中讀取 count 字節(jié)的數(shù)據(jù)并放入從 buf 開始的緩沖區(qū)中.
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
不論是客戶還是服務(wù)器應(yīng)用程序都可以用recv()函數(shù)從TCP連接的另一端接收數(shù)據(jù),它與read()函數(shù)的功能是差不多的。
ssize_t write(int fd, const void *buf, size_t count);
write()函數(shù)一般用于處于穩(wěn)定的TCP連接中傳輸數(shù)據(jù),當(dāng)然也能用于UDP協(xié)議中,它向套接字描述符 fd 中寫入 count 字節(jié)的數(shù)據(jù),數(shù)據(jù)起始地址由 buf 指定,函數(shù)調(diào)用成功返回寫的字節(jié)數(shù),失敗返回-1,并設(shè)置errno變量。
int send(int s, const void *msg, size_t len, int flags);
無論是客戶端還是服務(wù)器應(yīng)用程序都可以用send()函數(shù)來向TCP連接的另一端發(fā)送數(shù)據(jù)。
int sendto(int s, const void *msg, size_t len, int flags, const struct sockaddr *to, socklen_t tolen);
sendto()函數(shù)與send函數(shù)非常像,但是它會通過 struct sockaddr 指向的 to 結(jié)構(gòu)體指定要發(fā)送給哪個遠端主機,在to參數(shù)中需要指定遠端主機的IP地址、端口號等,而tolen參數(shù)則是指定to 結(jié)構(gòu)體的字節(jié)長度。
int close(int fd);
close()函數(shù)是用于關(guān)閉一個指定的套接字,在關(guān)閉套接字后,將無法使用對應(yīng)的套接字描述符
TCP客戶端一般流程
- 調(diào)用socket()函數(shù)創(chuàng)建一個套接字描述符。調(diào)用connect()函數(shù)連接到指定服務(wù)器中,端口號為服務(wù)器監(jiān)聽的端口號。調(diào)用write()函數(shù)發(fā)送數(shù)據(jù)。調(diào)用close()函數(shù)終止連接。
?//?創(chuàng)建套接字描述符
((sockfd?=?socket(AF_INET,?SOCK_STREAM,?0))?==?-1)?
//?建立TCP連接
(connect(sockfd,?(struct?sockaddr?*)&server,?sizeof(struct?sockaddr))
write(sockfd,?buffer,?sizeof(buffer))
close(sockfd);
TCP服務(wù)器一般流程
- 服務(wù)器的代碼流程如下:調(diào)用socket()函數(shù)創(chuàng)建一個套接字描述符。調(diào)用bind()函數(shù)綁定監(jiān)聽的端口號。調(diào)用listen()函數(shù)讓服務(wù)器進入監(jiān)聽狀態(tài)。調(diào)用accept()函數(shù)處理來自客戶端的連接請求。調(diào)用read()函數(shù)接收客戶端發(fā)送的數(shù)據(jù)。調(diào)用close()函數(shù)終止連接。
//?socket?create?and?verification
sockfd?=?socket(AF_INET,?SOCK_STREAM,?0);
//?binding?newly?created?socket?to?given?IP?and?verification???
if?((bind(sockfd,?(struct?sockaddr*)&server,?sizeof(server)))?!=?0)?
//?now?server?is?ready?to?listen?and?verification
if?((listen(sockfd,?5))?!=?0)?{
//?accept?the?data?packet?from?client?and?verification
connfd?=?accept(sockfd,?(struct?sockaddr*)&client,?&len);
if?(read(connfd,?buff,?sizeof(buff))?<=?0)?{
close(connfd);
close(sockfd);
這里也順帶分享一個socket 阻塞和非阻塞的機制
前面提到accept函數(shù)中,描述套接字沒有標(biāo)記為非阻塞模式,accept()函數(shù)的調(diào)用會阻塞應(yīng)用程序直至與遠程主機建立TCP連接;如果一個套接字被標(biāo)記為非阻塞式而隊列中沒有未完成連接套接字, 調(diào)用accept()函數(shù)將立即返回EAGAIN。但是socket默認(rèn)初始化是阻塞的,正常初始化后accept沒有收到客戶端的鏈接請求的話,就會一直的等待阻塞當(dāng)前線程,直到有客戶端進行鏈接請求。
那么如何才能把socket設(shè)置為非阻塞呢?用ioctl(sockfd, FIONBIO, &mode);
//-------------------------
//?Set?the?socket?I/O?mode:?In?this?case?FIONBIO
//?enables?or?disables?the?blocking?mode?for?the
//?socket?based?on?the?numerical?value?of?iMode.
//?If?iMode?=?0,?blocking?is?enabled;
//?If?iMode?!=?0,?non-blocking?mode?is?enabled.
u_long?iMode?=?1;??//non-blocking?mode?is?enabled.
ioctlsocket(m_socket,?FIONBIO,?&iMode);?//設(shè)置為非阻塞模式
一般大家介紹會說使用ioctlsocket,但是有些系統(tǒng)使用會報錯。如下:
ioctlsocket
會報錯,所以使用 ioctl
就好了,操作都是一樣的。
?#include?<sys/ioctl.h>
ioctl(sockfd,?FIONBIO,?&mode);
這是一個簡單的圖表分析,來自下面文章鏈接,大家有興趣也可以自行查看。
阻塞非阻塞的介紹 鏈接:
代碼實例
代碼有test_socket_client.cpp
、test_socket_server.h
、test_socket_server.cpp
三個文件,交互機制以及實現(xiàn)功能如下:
首先test_socket_client.cpp 是客戶端代碼,用來測試鏈接服務(wù)器端交互,用select進行接收數(shù)據(jù),并監(jiān)聽執(zhí)行終端是否有輸入信息,輸入信息立刻發(fā)送。
test_socket_server.h是test_socket_server.cpp使用定義的類和api的頭文件,而在test_socket_server.cpp實現(xiàn)了定義了一個支持多客戶端連接的通信接口,同時也時刻檢測執(zhí)行終端輸入信息,并廣播到全部鏈接的客戶端;而客戶端發(fā)過來的信息,服務(wù)端針對的點對點收發(fā),即接收到特定客戶端的信息只發(fā)送到該客戶端。其中使用了std::future + std::async
實現(xiàn)了通信的異步操作,并使用 impl模式
包裹了socket接口。在監(jiān)聽執(zhí)行終端信息時候分別使用了std::condition
和std::async
實現(xiàn),大家可以通過宏開關(guān)自行選擇測試。
還有些其他的技術(shù)使用,多線程的調(diào)度以及流的輸出,忽略SIGPIPE信號用來控制客戶端鏈接斷開之后代碼正常運行等,再后面我一一給大家分析介紹。
test_socket_client.cpp 這個文件就是隨便找了一個socket客戶端代碼,這個test_socket_client代碼來源是網(wǎng)絡(luò),大家也可以自己去寫或者網(wǎng)上自己找相關(guān)的用例,因為本次的重要部分是服務(wù)端server代碼,所以這塊就貼一下代碼。
#include?<stdio.h>
#include?<stdlib.h>
#include?<string.h>
#include?<errno.h>
#include?<sys/socket.h>
#include?<arpa/inet.h>
#include?<netinet/in.h>
#include?<sys/types.h>
#include?<unistd.h>
#include?<sys/time.h>
//g++?test_socket_client.cpp?-o??test_socket_client
#define?BUFLEN?1024
#define?PORT?8555
int?main(int?argc,?char?**argv)
{
????int?sockfd;
????struct?sockaddr_in?s_addr;
????socklen_t?len;
????unsigned?int?port;
????char?buf[BUFLEN];
????fd_set?rfds;
????struct?timeval?tv;
????int?retval,?maxfd;?
????
????/*建立socket*/
????if((sockfd?=?socket(AF_INET,?SOCK_STREAM,?0))?==?-1){
????????perror("socket");
????????exit(errno);
????}else
????????printf("socket?create?success!n");
????/*設(shè)置服務(wù)器ip*/
????memset(&s_addr,0,sizeof(s_addr));
????s_addr.sin_family?=?AF_INET;
????s_addr.sin_port?=?htons(PORT);
????if?(inet_aton("127.0.0.1",?(struct?in_addr?*)&s_addr.sin_addr.s_addr)?==?0)?{
????????perror("127.0.0.1");
????????exit(errno);
????}
??
????/*開始連接服務(wù)器*/?
????while(connect(sockfd,(struct?sockaddr*)&s_addr,sizeof(struct?sockaddr))?==?-1){
????????perror("connect");
????????sleep(1);
????????exit(errno);
????}
????while(1){
????????FD_ZERO(&rfds);
????????FD_SET(0,?&rfds);
????????maxfd?=?0;
????????FD_SET(sockfd,?&rfds);
????????if(maxfd?<?sockfd)
????????????maxfd?=?sockfd;
????????tv.tv_sec?=?6;
????????tv.tv_usec?=?0;
????????retval?=?select(maxfd+1,?&rfds,?NULL,?NULL,?&tv);
????????if(retval?==?-1){
????????????printf("select出錯,客戶端程序退出n");
????????????break;
????????}else?if(retval?==?0){
????????????printf("waiting...n");
????????????continue;
????????}else{
????????????/*服務(wù)器發(fā)來了消息*/
????????????if(FD_ISSET(sockfd,&rfds)){
????????????????/******接收消息*******/
????????????????bzero(buf,BUFLEN);
????????????????len?=?recv(sockfd,buf,BUFLEN,0);
????????????????if(len?>?0)
????????????????????printf("服務(wù)器發(fā)來的消息是:%sn",buf);
????????????????else{
????????????????????if(len?<?0?)
????????????????????????printf("接受消息失?。");
????????????????????else
????????????????????????printf("服務(wù)器退出了,聊天終止!n");
????????????????break;?
????????????????}
????????????}
????????????/*用戶輸入信息了,開始處理信息并發(fā)送*/
????????????if(FD_ISSET(0,?&rfds)){?
????????????????/******發(fā)送消息*******/?
????????????????bzero(buf,BUFLEN);
????????????????fgets(buf,BUFLEN,stdin);
???????????????
????????????????if(!strncasecmp(buf,"quit",4)){
????????????????????printf("client?請求終止聊天!n");
????????????????????break;
????????????????}
????????????????????len?=?send(sockfd,buf,strlen(buf),0);
????????????????if(len?>?0)
????????????????????printf("t消息發(fā)送成功:%sn",buf);?
????????????????else{
????????????????????printf("消息發(fā)送失敗!n");
????????????????????break;?
????????????????}?
????????????}
????????}
????
????}
????/*關(guān)閉連接*/
????close(sockfd);
????return?0;
}
test_socket_server.h 使用的頭文件,定義一些外部api
#ifndef?_TEST_SOCKET_H
#define?_TEST_SOCKET_H
#include?<functional>
#include?<memory>
#include?<thread>
#include?<vector>
namespace?linx_socket?{
int?Writen(int?fd,?const?void?*vptr,?int?n);
int?Readn(int?fd,?void?*vptr,?int?maxlen);
int?CreatSocket(const?char?*ip,?int?port);
int?StartLisen(int?fd);
bool?Close(int?fd);
}??//?namespace?linx_socket
class?DevSocket??{
?public:
??using?CallBack??=?std::function<void(int?,std::vector<uint8_t>&&)>;
??DevSocket();
??DevSocket(const?CallBack&?callback);
??bool?Send(int?fd,const?std::vector<uint8_t>?&data)?const?;
??//?std::vector<uint8_t>?Recive()?const?;?//當(dāng)建立連接后?就會在線程里面循環(huán)讀取客戶端發(fā)來的信息,?所以不需要專門寫rx函數(shù)
??~DevSocket(){}
??private:
??class?Socket;
??std::unique_ptr<Socket>?SocketImpl;
};
#endif
test_socket_server.cpp
里面包含的#include "log.h"
這個文件是我自己寫的log輸出文件,打印時間和顏色等,看著比較方便,大家使用代碼時候自行替換成自己需要printf或者std::cout或者自己的打印文件
#include?<stdio.h>
#include?<algorithm>
#include?<array>
#include?<chrono>
#include?<boost/thread/mutex.hpp>
#include?<mutex>
#include?<condition_variable>
#include?<iostream>
#include?<iterator>
#include?<string>
#include?<thread>
#include?<vector>
#include?<arpa/inet.h>
#include?<errno.h>
#include?<net/if.h>
#include?<netinet/in.h>
#include?<netinet/tcp.h>
#include?<sys/socket.h>
#include?<unistd.h>
#include?<future>
#include?"test_socket_server.h"
#include?"log.h"
//?g++?test_socket_server_optimiza_2.cpp?-o??test_socket_server_optimiza?-lboost_thread?-lpthread
namespace?linx_socket
{
????constexpr?int?socket_que_size?=?3;
????//使用select進行寫入
????int?Writen(int?fd,?const?void?*vptr,?int?n)
????{
????????ssize_t?nleft?=?n;
????????const?char?*ptr?=?(const?char?*)vptr;
????????fd_set?write_fd_set;
????????struct?timeval?timeout;
????????while?(nleft?>?0)
????????{
????????????ssize_t?nwriten?=?0;
????????????timeout.tv_sec?=?1;
????????????timeout.tv_usec?=?0;
????????????FD_ZERO(&write_fd_set);
????????????FD_SET(fd,?&write_fd_set);
????????????int?s_ret?=?select(FD_SETSIZE,?NULL,?&write_fd_set,?NULL,?&timeout);
????????????if?(s_ret?<?0)
????????????{
????????????????EXC_ERROR("-------write_fd_set?error------------");
????????????????return?-1;
????????????}
????????????else?if?(s_ret?==?0)
????????????{
????????????????usleep(100?*?1000);
????????????????EXC_ERROR("-------write_fd_set?timeout?------------");
????????????????continue;
????????????}
????????????if?((nwriten?=?write(fd,?ptr,?nleft))?<?0)
????????????{
????????????????if?(nwriten?<?0?&&?errno?==?EINTR)
????????????????{
????????????????????nwriten?=?0;
????????????????}
????????????????else
????????????????{
????????????????????EXC_ERROR("-------nwriten?error?=?%d?------------",?nwriten);
????????????????????return?-1;
????????????????}
????????????}
????????????nleft?-=?nwriten;
????????????ptr?+=?nwriten;
????????}
????????return?n;
????}
????//使用select進行讀取
????int?Readn(int?fd,?void?*vptr,?int?maxlen)
????{
????????bool?ret?=?false;
????????ssize_t?nread?=?0;
????????fd_set?read_fd_set;
????????struct?timeval?timeout;
????????while?(!ret)
????????{
????????????//?EXC_INFO("Readn?begine.");
????????????timeout.tv_sec?=?1;
????????????timeout.tv_usec?=?0;
????????????FD_ZERO(&read_fd_set);
????????????FD_SET(fd,?&read_fd_set);
????????????int?s_ret?=?select(FD_SETSIZE,?&read_fd_set,?NULL,?NULL,?&timeout);
????????????if?(s_ret?<?0)
????????????{
????????????????EXC_ERROR("-------select?error------------");
????????????????return?-1;
????????????}
????????????else?if?(s_ret?==?0)
????????????{
????????????????usleep(100?*?1000);
????????????????//?EXC_ERROR("-------select?timeout?------------");
????????????????continue;
????????????}
????????????if?((nread?=?read(fd,?vptr,?maxlen))?<?0)
????????????{
????????????????if?(errno?==?EINTR)
????????????????{
????????????????????EXC_ERROR("buff?=?%d,?fd=%d,?errno=%d.",?vptr,?fd,?errno);
????????????????????nread?=?0;
????????????????}
????????????????else
????????????????{
????????????????????EXC_ERROR("buff?=?%d,?fd=%d,?errno=%d.",?vptr,?fd,?errno);
????????????????????return?-1;
????????????????}
????????????}
????????????else
????????????{
????????????????if?(nread?==?0)
????????????????{
????????????????????EXC_ERROR("buff?=?%d,?fd=%d,?nread=%d.?data:%s",?vptr,?fd,?nread,?vptr);
????????????????}
????????????????//?else
????????????????//?{
????????????????//?????EXC_INFO("buff?=?%d,?fd=%d,?nread=%d.?data:%s",?vptr,?fd,?nread,?vptr);
????????????????//?}
????????????????ret?=?1;
????????????}
????????}
????????return?nread;
????}
????//進行處理來自客戶端的連接請求
????int?IsListened(int?fd)
????{
????????struct?sockaddr_in?c_addr;
????????socklen_t?c_lent?=?sizeof(c_addr);
????????int?fd_c?=?accept(fd,?(struct?sockaddr?*)&c_addr,?&c_lent);
????????if?(fd_c?<?0)
????????{
????????????if?(errno?==?EPROTO?||?errno?==?ECONNABORTED)
????????????{
????????????????return?-1;
????????????}
????????}
????????EXC_INFO("accept?%s:?%d?sucess",?inet_ntoa(c_addr.sin_addr),?ntohs(c_addr.sin_port));
????????return?fd_c;
????}
????//創(chuàng)建一個套接字描述符
????int?CreatSocket(const?char?*ip,?int?port)
????{
????????int?ret?=?-1;
????????//?EXC_INFO("CreatSocket");
????????int?fd?=?socket(AF_INET,?SOCK_STREAM,?IPPROTO_TCP);
????????if?(fd?<?0)
????????{
????????????return?-1;
????????}
????????int?reuse?=?1;
????????//設(shè)置套接字的一些選項 SOL_SOCKET:表示在Socket層 SO_REUSEADDR(允許重用本地地址和端口)
????????if?(setsockopt(fd,?SOL_SOCKET,?SO_REUSEADDR,?&reuse,?sizeof(reuse))?<?0)
????????{
????????????return?-1;
????????}
????????struct?sockaddr_in?s_addr;
????????memset(&s_addr,?0,?sizeof(s_addr));
????????s_addr.sin_addr.s_addr?=?htonl(INADDR_ANY);
????????s_addr.sin_port?=?htons(port);
????????s_addr.sin_family?=?AF_INET;
????????if?(bind(fd,?(struct?sockaddr?*)&s_addr,?sizeof(s_addr))?<?0)
????????{
????????????EXC_ERROR("bind?%s:?%d?error",?inet_ntoa(s_addr.sin_addr),?ntohs(s_addr.sin_port));
????????????close(fd);
????????????return?-2;
????????}
????????if?(listen(fd,?socket_que_size)?<?0)
????????{
????????????close(fd);
????????????return?-3;
????????}
????????return?fd;
????}
????int?CreatSocket(const?char?*ip,?int?port,?int?socket_que_size)
????{
????????int?ret?=?-1;
????????EXC_INFO("");
????????int?fd?=?socket(AF_INET,?SOCK_STREAM,?IPPROTO_TCP);
????????if?(fd?<?0)
????????{
????????????return?-1;
????????}
????????struct?sockaddr_in?s_addr;
????????memset(&s_addr,?0,?sizeof(s_addr));
????????s_addr.sin_addr.s_addr?=?htonl(INADDR_ANY);
????????s_addr.sin_port?=?htons(port);
????????s_addr.sin_family?=?AF_INET;
????????if?(bind(fd,?(struct?sockaddr?*)&s_addr,?sizeof(s_addr))?<?0)
????????{
????????????close(fd);
????????????return?-2;
????????}
????????if?(listen(fd,?socket_que_size)?<?0)
????????{
????????????close(fd);
????????????return?-3;
????????}
????????return?fd;
????}
????bool?Close(int?fd)
????{
????????close(fd);
????????return?true;
????}
}?//?namespace?linx_socket
class?Connection
{
public:
????Connection(int?fd,?DevSocket::CallBack?c)?:?call_back_f_(c),?fd_(fd)
????{
????????read_sta?=?std::async(std::launch::async,?[this]()
??????????????????????????????{?Read();?});?//循環(huán)讀取socket連接的數(shù)據(jù)
????};
????void?Read()
????{
????????while?(!kill_thread_)
????????{
????????????if?(fd_?<?0)
????????????????break;
????????????std::array<uint8_t,?kBuffSize>?buf;
????????????int?len?=?linx_socket::Readn(fd_,?buf.data(),?kBuffSize);
????????????if?(len?>?0)
????????????{
????????????????if?(call_back_f_)
????????????????{
????????????????????data_parser_mutex_.lock();
????????????????????call_back_f_(fd_,?{buf.begin(),?buf.begin()?+?len});
????????????????????data_parser_mutex_.unlock();
????????????????}
????????????}
????????????else?if?(len?<?0)
????????????{
????????????????kill_thread_?=?true;
????????????????EXC_ERROR("read?error,?fd=?%d,?rev?len=?%d.",?fd_,?len);
????????????????break;
????????????}
????????????else?if?(len?==?0)
????????????{
????????????????std::this_thread::sleep_for(std::chrono::seconds(1));
????????????????EXC_ERROR("call_back_f_?=?%d,?fd=%d,?rev?len=%d.",?call_back_f_,?fd_,?len);
????????????}
????????}
????}
????bool?Write(std::vector<uint8_t>?data)
????{
????????if?(linx_socket::Writen(fd_,?data.data(),?data.size())?<?0)
????????{
????????????kill_thread_?=?true;
????????????EXC_ERROR("Writen?error.");
????????????return?false;
????????}
????????return?true;
????}
????bool?GetIsKillThread()?{?return?kill_thread_;?}
????~Connection()
????{
????????EXC_INFO("kill_thread_?is?%d",?kill_thread_);
????????kill_thread_?=?true;
????????if?(fd_?!=?-1)
????????{
????????????linx_socket::Close(fd_);
????????????fd_?=?-1;
????????}
????}
????std::future<void>?&GetReadSta()?{?return?read_sta;?}
????int?GetFd()?{?return?fd_;?}
private:
????int?fd_?=?-1;
????bool?kill_thread_?=?false;
????DevSocket::CallBack?call_back_f_?=?nullptr;?/**/
????boost::mutex?data_parser_mutex_;
????std::future<void>?read_sta;
????constexpr?static?int?kBuffSize?=?1024;
};
class?DevSocket::Socket
{
public:
????Socket(){};
????Socket(std::pair<std::string,?int>?port,?const?CallBack?&callback_)
????????:?call_back_f_(callback_)
????{
????????EXC_WARN("Socket?");
????????int?n;
????????if?((n?=?linx_socket::CreatSocket(port.first.c_str(),?port.second))?<?0)
????????{
????????????throw?std::string("CreatSocket??error?")?+?std::to_string(n);
????????}
????????fd?=?n;
????????auto?threa_func?=?[this]()
????????{
????????????while?(!kill_thread_)
????????????{
????????????????//循環(huán)std::launch::async?傳遞的可調(diào)用對象異步執(zhí)行?
????????????????std::future<int>?listened_status?=?std::async(std::launch::async,?[this]()
??????????????????????????????????????????????????????????????{
??????????????????????????????????????????????????????????????????EXC_INFO("Listened?.");
??????????????????????????????????????????????????????????????????return?linx_socket::IsListened(fd);
??????????????????????????????????????????????????????????????});
????????????????//lister?套接字有沒有偵聽到連接,任務(wù)沒返回,沒有偵聽到連接套接字
????????????????while?(listened_status.wait_for(std::chrono::seconds(0))?!=
???????????????????????std::future_status::ready)
????????????????{
????????????????????if?(kill_thread_)
????????????????????????return;
????????????????????for?(auto?it?=?connections_.begin();?it?!=?connections_.end();)
????????????????????{
????????????????????????//任務(wù)返回了,說明該連接結(jié)束了
????????????????????????if?((*it)->GetReadSta().wait_for(std::chrono::seconds(0))?==
????????????????????????????std::future_status::ready)
????????????????????????{
????????????????????????????if?((*it)->GetReadSta().valid())
????????????????????????????{
????????????????????????????????EXC_ERROR("connection_kill_thread?is?%d,?socket_kill_thread_?is?=%d",?(*it)->GetIsKillThread(),?kill_thread_);
????????????????????????????????(*it)->GetReadSta().get();//主動退出
????????????????????????????}
????????????????????????????EXC_INFO("dis?connection_?fd=%d.",?(*it)->GetFd());
????????????????????????????boost::mutex::scoped_lock?lock(connection_mutex_);
????????????????????????????it?=?connections_.erase(it);
????????????????????????????if?(connections_.size()?<=?0)
????????????????????????????{
????????????????????????????????EXC_ERROR("all?is?dis?connected");
????????????????????????????}
????????????????????????}
????????????????????????if?(it?!=?connections_.end())
????????????????????????{
????????????????????????????++it;
????????????????????????}
????????????????????}
????????????????????std::this_thread::sleep_for(std::chrono::milliseconds(10));
????????????????}
?????????//????EXC_INFO(?"====================?thread?id:?%d"?,std::this_thread::get_id());
????????????????//有新的連接
????????????????int?clien_fd?=?listened_status.get();
????????????????if?(clien_fd?>?0)
????????????????{
????????????????????boost::mutex::scoped_lock?lock(connection_mutex_);
????????????????????connections_.push_back(
????????????????????????std::make_shared<Connection>(clien_fd,?call_back_f_));
????????????????????EXC_INFO("connection_?fd=%d.",?clien_fd);
????????????????}
????????????}
????????};
????????//?EXC_INFO("before?move?threa_func=%d.",?threa_func);
????????thread_?=?std::thread(std::move(threa_func));?//左值變右值傳入?減少拷貝
????????//?EXC_INFO("after??move?threa_func=%d.",?threa_func);
????}
????bool?SendData(const?std::vector<uint8_t>?&data,?std::shared_ptr<Connection>?connection)
????{
????????boost::mutex::scoped_lock?lock(connection_mutex_);
????????return?connection->Write(data);
????}
????std::vector<std::shared_ptr<Connection>>?GetConnections()
????{
????????return?connections_;
????}
????~Socket()
????{
????????kill_thread_?=?true;
????????if?(fd?!=?-1)
????????{
????????????linx_socket::Close(fd);
????????}
????????if?(thread_.joinable())
????????{
????????????thread_.join();
????????}
????}
private:
????int?fd?=?-1;
????bool?kill_thread_?=?false;
????CallBack?call_back_f_?=?nullptr;
????std::thread?thread_;
????std::vector<std::shared_ptr<Connection>>?connections_;
????boost::mutex?connection_mutex_;
};
#define?HOST?"127.0.0.1"?//?根據(jù)你服務(wù)器的IP地址修改
#define?PORT?8555????????//?根據(jù)你服務(wù)器進程綁定的端口號修改
DevSocket::DevSocket()
{
????EXC_WARN("new??DevSocket");
????std::pair<std::string,?int>?par{HOST,?PORT};
????SocketImpl?=?std::unique_ptr<Socket>(new?Socket(par,?nullptr));
}
DevSocket::DevSocket(const?CallBack?&callback)
{
????EXC_WARN("new??DevSocket");
????std::pair<std::string,?int>?par{HOST,?PORT};
????SocketImpl?=?std::unique_ptr<Socket>(new?Socket(par,?callback));
}
bool?DevSocket::Send(int?fd,?const?std::vector<uint8_t>?&data)?const
{
????for?(auto?connection?:?SocketImpl->GetConnections())
????{
????????if?(nullptr?==?connection)
????????????continue;
????????if?(fd?==?connection->GetFd()?||?fd?==?0)?//fd?==0?全部發(fā)送
????????{
????????????int?ret?=?SocketImpl->SendData(data,?connection);
????????????EXC_WARN("fd?%d??send?status?:%d",?connection->GetFd(),?ret);
????????}
????}
????return?true;
}
std::ostream?&operator<<(std::ostream?&out,?std::vector<uint8_t>?&data)
{
????EXC_WARN("operator?1<<<<<<<<<<<<<<");
????out?<<?"hex?";
????out?<<?std::hex;
????for?(auto?&d?:?data)
????{
????????out?<<?"0x"?<<?std::hex?<<?(int)d?<<?"?";
????}
????out?<<?std::endl;
????EXC_WARN("operator?2<<<<<<<<<<<<<");
????return?out;
}
#include?<signal.h>
void?pipesig_handler(int?sig)
{
????EXC_ERROR("receive?signal?%d",?sig);
}
#if?1?//std::async?控制發(fā)送
int?main(int?argc,?char?*argv[])
{
????DevSocket?*Device;
????//?為SIGPIPE添加信號處理函數(shù),處理完程序繼續(xù)執(zhí)行?1
????signal(SIGPIPE,?pipesig_handler);
????bool?SendFlag?=?false;
????std::vector<uint8_t>?send_data;?
????int?read_fd{-1};
????try
????{
????????EXC_INFO("device?socket?init");
????????Device?=
????????????new?DevSocket([&](int?fd,?std::vector<uint8_t>?&&d)
??????????????????????????{
??????????????????????????????EXC_INFO("recive?call?fd?:%d",?fd);
??????????????????????????????send_data?=?d;
??????????????????????????????SendFlag?=?true;
??????????????????????????????std::ostringstream?ss;
??????????????????????????????ss?<<?"recive?data:[";
??????????????????????????????std::for_each(send_data.begin(),?send_data.end(),
????????????????????????????????????????????[&](uint8_t?temp)
????????????????????????????????????????????{?ss?<<?"?"?<<?temp?<<?",";?});
??????????????????????????????EXC_WARN("%s]",?ss.str().c_str());
????????????????????????????//???std::cout?<<?send_data;?//使用operator<<?函數(shù)
??????????????????????????});
????}
????catch?(const?std::string?s)
????{
????????EXC_INFO("Device.emplace_back:%s",?s.c_str());
????????return?EXIT_FAILURE;
????}
????const?int?BUFLEN?=?1024;
????char?buf[BUFLEN];
????std::thread?input_keyboard?=?std::thread([&]
?????????????????????????????????????????????{
?????????????????????????????????????????????????while?(true)
?????????????????????????????????????????????????{
?????????????????????????????????????????????????????memset(buf,?0,?sizeof(buf));
?????????????????????????????????????????????????????/*fgets函數(shù):從流中讀取BUFLEN-1個字符*/
?????????????????????????????????????????????????????fgets(buf,?BUFLEN,?stdin);
?????????????????????????????????????????????????????EXC_INFO("from?terminal:%s",?buf);
?????????????????????????????????????????????????????if?(!strncasecmp(buf,?"quit",?4))
?????????????????????????????????????????????????????{
?????????????????????????????????????????????????????????EXC_INFO("server?quit!");
?????????????????????????????????????????????????????????exit(0);
?????????????????????????????????????????????????????}
?????????????????????????????????????????????????????std::vector<uint8_t>?send_msg;
?????????????????????????????????????????????????????for?(int?i?=?0;?buf[i]?!=?'