13 实例:漫谈一个服务器的结构
本文将介绍我曾经做过的一个项目的服务器架构和服务器编程的一些重要细节。一、程序运行环境操作系统centos 7.0编译器gcc/g 4.8.3 cmake 2.8.11mysql数据库5.5.47项目代码管理工具VS2013一、程序结构该程序总共有17个线程其中分为9个数据库工作线程D和一个日志线程L6个普通工作线程W一个主线程M。以下会用这些字母来代指这些线程一、数据库工作线程的用途9个数据库工作线程在线程启动之初与mysql建立连接也就是说每个线程都与mysql保持一路连接共9个数据库连接。每个数据库工作线程同时存在两个任务队列第一个队列A存放需要执行数据库增删查改操作的任务sqlTask第二个队列B存放sqlTask执行完成后的结果。sqlTask执行完成后立即放入结果队列中因而结果队列中任务也是一个个的需要执行的任务。大致伪代码如下void db_thread_func() { while (!m_bExit) { if (NULL ! (pTask m_sqlTask.Pop())) { //从m_sqlTask中取出的任务先执行完成后pTask将携带结果数据 pTask-Execute(); //得到结果后立刻将该任务放入结果任务队列 m_resultTask.Push(pTask); continue; } sleep(1000); } }现在的问题来了1.任务队列A中的任务从何而来目前只有消费者没有生产者那么生产者是谁2.任务队列B中的任务将去何方目前只有生产者没有消费者。这两个问题先放一会儿等到后面我再来回答。二工作线程和主线程在介绍主线程和工作线程具体做什么时我们介绍下服务器编程中常常抽象出来的几个概念这里以tcp连接为例1.TcpServer 即Tcp服务服务器需要绑定ip地址和端口号并在该端口号上侦听客户端的连接往往由一个成员变量TcpListener来管理侦听细节。所以一个TcpServer要做的就是这些工作。除此之外每当有新连接到来时TcpServer需要接收新连接当多个新连接存在时TcpServer需要有条不紊地管理这些连接连接的建立、断开等即产生和管理下文中说的TcpConnection对象。2.一个连接对应一个TcpConnection对象TcpConnection对象管理着这个连接的一些信息如连接状态、本端和对端的ip地址和端口号等。3.数据通道对象ChannelChannel记录了socket的句柄因而是一个连接上执行数据收发的真正执行者Channel对象一般作为TcpConnection的成员变量。4.TcpSession对象是将Channel收取的数据进行解包或者对准备好的数据进行装包并传给Channel发送。归纳起来一个TcpServer依靠TcpListener对新连接的侦听和处理依靠TcpConnection对象对连接上的数据进行管理TcpConnection实际依靠Channel对数据进行收发依靠TcpSession对数据进行装包和解包。也就是说一个TcpServer存在一个TcpListener对应多个TcpConnection有几个TcpConnection就有几个TcpSession同时也就有几个Channel。以上说的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是服务器框架的网络层。一个好的网络框架应该做到与业务代码脱耦。即上层代码只需要拿到数据执行业务逻辑而不用关注数据的收发和网络数据包的封包和解包以及网络状态的变化比如网络断开与重连。拿数据的发送来说当业务逻辑将数据交给TcpSessionTcpSession将数据装好包后装包过程后可以有一些加密或压缩操作交给TcpConnection::SendData()而TcpConnection::SendData()实际是调用Channel::SendData()因为Channel含有socket句柄所以Channel::SendData()真正调用send()/sendto()/write()方法将数据发出去。对于数据的接收稍微有一点不同通过select()/poll()/epoll()等IO multiplex技术确定好了哪些TcpConnection上有数据到来后激活该TcpConnection的Channel对象去调用recv()/recvfrom()/read()来收取数据。数据收到以后将数据交由TcpSession来处理最终交给业务层。注意数据收取、解包乃至交给业务层是一定要分开的。我的意思是最好不要解包并交给业务层和数据收取的逻辑放在一起。因为数据收取是IO操作而解包和交给业务层是逻辑计算操作。IO操作一般比逻辑计算要慢。到底如何安排要根据服务器业务来取舍也就是说你要想好你的服务器程序的性能瓶颈在网络IO还是逻辑计算即使是网络IO也可以分为上行操作和下行操作上行操作即客户端发数据给服务器下行即服务器发数据给客户端。有时候数据上行少下行大。如游戏服务器一个npc移动了位置上行是该客户端通知服务器自己最新位置而下行确是服务器要告诉在场的每个客户端。在我的前面章节中介绍了工作线程的流程while (!m_bQuit) { epoll_or_select_func(); handle_io_events(); handle_other_things(); }其中epoll_or_select_func()即是上文所说的通过select()/poll()/epoll()等IO multiplex技术确定好了哪些TcpConnection上有数据到来。我的服务器代码中一般只会监测socket可读事件而不会监测socket可写事件。至于如何发数据文章后面会介绍。所以对于可读事件以epoll为例这里需要设置的标识位是EPOLLIN 普通可读事件当连接正常时产生这个事件recv()/read()函数返回收到的字节数当连接关闭这两个函数返回0也就是说我们设置这个标识已经可以监测到新来数据和对端关闭事件EPOLLRDHUP 对端关闭事件linux man手册上说这个事件可以监测对端关闭但我实际调试时发送即使对端关闭也没触发这个事件仍然是EPOLLIN只不过此时调用recv()/read()函数返回值会为0所以实际项目中是否可以通过设置这个标识来监测对端关闭仍然待考证EPOLLPRI 带外数据muduo里面将epoll_wait的超时事件设置为1毫秒我的另一个项目将epoll_wait超时时间设置为10毫秒。这两个数值供大家参考。这个项目中工作线程和主线程都是上文代码中的逻辑主线程监听侦听socket上的可读事件也就是监测是否有新连接来了。主线程和每个工作线程上都存在一个epollfd。如果新连接来了则在主线程的handle_io_events()中接收新连接。产生的新连接的socket句柄挂接到哪个线程的epollfd上呢这里采取的做法是round-robin算法即存在一个对象CWorkerThreadManager记录了各个工作线程上工作状态。伪码大致如下void attach_new_fd(int newsocketfd) { workerthread get_next_worker_thread(next); workerthread.attach_to_epollfd(newsocketfd); next; if (next max_worker_thread_num) next 0; }即先从第一个工作线程的epollfd开始挂接新来socket接着累加索引这样下次就是第二个工作线程了。如果所以超出工作线程数目则从第一个工作重新开始。这里解决了新连接socket“负载均衡”的问题。在实际代码中还有个需要注意的细节就是epoll_wait的函数中的struct epoll_event 数量开始到底要设置多少个才合理存在的顾虑是多了浪费少了不够用我在曾经一个项目中直接用的是4096const int EPOLL_MAX_EVENTS 4096; const int dwSelectTimeout 10000; struct epoll_event events[EPOLL_MAX_EVENTS]; int nfds epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000);我在陈硕的muduo网络库中发现作者才用了一个比较好的思路即动态扩张数量开始是n个当发现有事件的fd数量已经到达n个后将struct epoll_event数量调整成2n个下次如果还不够则变成4n个以此类推作者巧妙地利用stl::vector在内存中的连续性来实现了这种思路//初始化代码 std::vectorstruct epoll_event events_(16); //线程循环里面的代码 while (m_bExit) { int numEvents ::epoll_wait(epollfd_, *events_.begin(), static_castint(events_.size()), 1); if (numEvents 0) { if (static_castsize_t(numEvents) events_.size()) { events_.resize(events_.size() * 2); } } }读到这里你可能觉得工作线程所做的工作也不过就是调用handle_io_events()来接收网络数据其实不然工作线程也可以做程序业务逻辑上的一些工作。也就是在handle_other_things()里面。那如何将这些工作加到handle_other_things()中去做呢写一个队列任务先放入队列再让handle_other_things()从队列中取出来做我在该项目中也借鉴了muduo库的做法。即handle_other_things()中调用一系列函数指针伪码如下void do_other_things() { somefunc(); } //m_functors是一个stl::vector,其中每一个元素为一个函数指针 void somefunc() { for (size_t i 0; i m_functors.size(); i) { m_functors[i](); } m_functors.clear(); }当任务产生时只要我们将执行任务的函数push_back到m_functors这个stl::vector对象中即可。但是问题来了如果是其他线程产生的任务两个线程同时操作m_functors必然要加锁这也会影响效率。muduo是这样做的void add_task(const Functor cb) { std::unique_lockstd::mutex lock(mutex_); m_functors.push_back(cb); } void do_task() { std::vectorFunctor functors; { std::unique_lockstd::mutex lock(mutex_); functors.swap(m_functors); } for (size_t i 0; i functors.size(); i) { functors[i](); } }看到没有利用一个栈变量functors将m_functors中的任务函数指针倒换swap过来了这样大大减小了对m_functors操作时的加锁粒度。前后变化变化前相当于原来A给B多少东西B消耗多少A给的时候B不能消耗B消耗的时候A不能给。现在变成A将东西放到篮子里面去B从篮子里面拿B如果拿去一部分后只有消耗完了才会来拿或者A通知B去篮子里面拿而B忙碌时A是不会通知B来拿这个时候A只管将东西放在篮子里面就可以了。bool bBusy false; void add_task(const Functor cb) { std::unique_lockstd::mutex lock(mutex_); m_functors_.push_back(cb); //B不忙碌时只管往篮子里面加不要通知B if (!bBusy) { wakeup_to_do_task(); } } void do_task() { bBusy true; std::vectorFunctor functors; { std::unique_lockstd::mutex lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i 0; i functors.size(); i) { functors[i](); } bBusy false; }看多巧妙的做法因为每个工作线程都存在一个m_functors现在问题来了如何将产生的任务均衡地分配给每个工作线程。这个做法类似上文中如何将新连接的socket句柄挂载到工作线程的epollfd上也是round-robin算法。上文已经描述此处不再赘述。还有种情况就是希望任务产生时工作线程能够立马执行这些任务而不是等epoll_wait超时返回之后。这个时候的做法就是使用一些技巧唤醒epoll_waitlinux系统可以使用socketpair或timerevent、eventfd等技巧这个细节在我的博文《服务器端编程心得一—— 主线程与工作线程的分工》已经详细介绍过了。上文中留下三个问题1.数据库线程任务队列A中的任务从何而来目前只有消费者没有生产者那么生产者是谁2.数据库线程任务队列B中的任务将去何方目前只有生产者没有消费者。3.业务层的数据如何发送出去问题1的答案是业务层产生任务可能会交给数据库任务队列A这里的业务层代码可能就是工作线程中do_other_things()函数执行体中的调用。至于交给这个9个数据库线程的哪一个的任务队列同样采用了round-robin算法。所以就存在一个对象CDbThreadManager来管理这九个数据库线程。下面的伪码是向数据库工作线程中加入任务bool CDbThreadManager::AddTask(IMysqlTask* poTask ) { if (m_index m_dwThreadsCount) { m_index 0; } return m_aoMysqlThreads[m_index].AddTask(poTask); }同理问题2中的消费者也可能就是do_other_things()函数执行体中的调用。现在来说问题3业务层的数据产生后经过TcpSession装包后需要发送的话产生任务丢给工作线程的do_other_things()然后在相关的Channel里面发送因为没有监测该socket上的可写事件所以该数据可能调用send()或者write()时会阻塞没关系sleep()一会儿继续发送一直尝试到数据发出去。伪码如下bool Channel::Send() { int offset 0; while (true) { int n ::send(socketfd, buf offset, length - offset); if (n -1) { if (errno EWOULDBLOCK) { ::sleep(100); continue; } } //对方关闭了socket这端建议也关闭 else if (n 0) { close(socketfd); return false; } offset n; if (offset length) break; } return true; }