1.阻塞/非阻塞、同步/异步(网络IO)
阻塞/非阻塞、同步/异步(网络IO)
数据就绪:根据系统IO操作的就绪状态
数据读写:根据应用程序和内核的交互方式
陈硕:在处理 IO 的时候,阻塞和非阻塞都是同步 IO,只有使用了特殊的 API 才是异步 IO。
一个典型的网络IO接口调用,分为两个阶段,分别是“数据就绪” 和 “数据读写”,数据就绪阶段分为阻塞和非阻塞,表现得结果就是,阻塞当前线程或是直接返回。
同步 表示A向B请求调用一个网络IO接口时(或者调用某个业务逻辑API接口时),数据的读写都是由请求方A自己来完成的(不管是阻塞还是非阻塞);异步 表示A向B请求调用一个网络IO接口时(或者调用某个业务逻辑API接口时),向B传入请求的事件以及事件发生时通知的方式,A就可以处理其它逻辑了,当B监听到事件处理完成后,会用事先约定好的通知方式,通知A处理结果。
5.2 Unix/Linux上的5种IO模型 a.阻塞 blocking (BIO) 调用者调用了某个函数,等待这个函数返回,期间什么也不做,不停的去检查这个函数有没有返回,必须等这个函数返回才能进行下一步动作。
b.非阻塞 non-blocking(NIO) 非阻塞等待,每隔一段时间就去检测IO事件是否就绪。没有就绪就可以做其他事。非阻塞I/O执行系统调用总是立即返回,不管事件是否已经发生,若事件没有发生,则返回-1,此时可以根据 errno 区分这两种情况,对于accept,recv 和 send,事件未发生时,errno 通常被设置成 EAGAIN。
c.IO复用(IO multiplexing) Linux 用 select/poll/epoll 函数实现 IO 复用模型,这些函数也会使进程阻塞,但是和阻塞IO所不同的是这些函数可以同时阻塞多个IO操作。而且可以同时对多个读操作、写操作的IO函数进行检测。直到有数 据可读或可写时,才真正调用IO操作函数
d.信号驱动(signal-driven) Linux 用套接口进行信号驱动 IO,安装一个信号处理函数,进程继续运行并不阻塞,当IO事件就绪,进程收到SIGIO 信号,然后处理 IO 事件。
内核在第一个阶段是异步,在第二个阶段是同步;与非阻塞IO的区别在于它提供了消息通知机制,不需要用户进程不断的轮询检查,减少了系统API的调用次数,提高了效率。
e.异步(asynchronous) Linux中,可以调用 aio_read 函数告诉内核描述字缓冲区指针和缓冲区的大小、文件偏移及通知的方式,然后立即返回,当内核将数据拷贝到缓冲区后,再通知应用程序
5.3 Web服务器简介及Http协议 一个 Web Server 就是一个服务器软件(程序),或者是运行这个服务器软件的硬件(计算机)。其主要功能是通过 HTTP 协议与客户端(通常是浏览器(Browser))进行通信,来接收,存储,处理来自客户端的 HTTP 请求,并对其请求做出 HTTP 响应,返回给客户端其请求的内容(文件、网页等)或返回一个 Error 信息
通常用户使用 Web 浏览器与相应服务器进行通信。在浏览器中键入“域名”或“IP地址:端口号”,浏览器则先将你的域名解析成相应的 IP 地址或者直接根据你的IP地址向对应的 Web 服务器发送一个 HTTP 请求。这一过程首先要通过 TCP 协议的三次握手建立与目标 Web 服务器的连接,然后 HTTP 协议生成针对目标 Web 服务器的 HTTP 请求报文,通过 TCP、IP 等协议发送到目标 Web 服务器上。
HTTP协议 简介 超文本传输协议(Hypertext Transfer Protocol,HTTP)是一个简单的CS的请求 - 响应协议,它通常运行在TCP 之上。它指定了客户端可能发送给服务器什么样的消息以及得到什么样的响应。请求和响应消息的头以 ASCII 形式给出;而消息内容则具有一个类似 MIME 的格式。HTTP是万维网的数据通信的基础。
HTTP的发展是由蒂姆·伯纳斯-李于1989年在欧洲核子研究组织(CERN)所发起。HTTP的标准制定由万维网协会(World Wide Web Consortium,W3C)和互联网工程任务组(Internet Engineering TaskForce,IETF)进行协调,最终发布了一系列的RFC,其中最著名的是1999年6月公布的 RFC 2616,定义了HTTP协议中现今广泛使用的一个版本——HTTP 1.1。
概述 HTTP 是一个客户端终端(用户)和服务器端(网站)请求和应答的标准(TCP)。通过使用网页浏览器、网络爬虫或者其它的工具,客户端发起一个HTTP请求到服务器上指定端口(默认端口为80)。我们称这个客户端为用户代理程序(user agent)。应答的服务器上存储着一些资源,比如 HTML 文件和图像。我们称这个应答服务器为源服务器(origin server)。在用户代理和源服务器中间可能存在多个“中间层”,比如代理服务器、网关或者隧道(tunnel)。
尽管 TCP/IP 协议是互联网上最流行的应用,HTTP 协议中,并没有规定必须使用它或它支持的层。事实上,HTTP可以在任何互联网协议上,或其他网络上实现。HTTP 假定其下层协议提供可靠的传输。因此,任何能够提供这种保证的协议都可以被其使用。因此也就是其在 TCP/IP 协议族使用 TCP 作为其传输层。
通常,由HTTP客户端发起一个请求,创建一个到服务器指定端口(默认是80端口)的 TCP 连接。HTTP服务器则在那个端口监听客户端的请求。一旦收到请求,服务器会向客户端返回一个状态,比如”HTTP/1.1 200 OK”,以及返回的内容,如请求的文件、错误消息、或者其它信息。
工作原理 HTTP 协议定义 Web 客户端如何从 Web 服务器请求 Web 页面,以及服务器如何把 Web 页面传送给客户端。HTTP 协议采用了请求/响应模型 。客户端向服务器发送一个请求报文,请求报文包含请求的方法、URL、协议版本、请求头部和请求数据。服务器以一个状态行作为响应,响应的内容包括协议的版本、成功或者错误代码、服务器信息、响应头部和响应数据。
以下是 HTTP 请求/响应的步骤:
客户端连接到 Web 服务器 一个HTTP客户端,通常是浏览器,与 Web 服务器的 HTTP 端口(默认为 80 )建立一个 TCP 套接字连接。例如,http://www.baidu.com。(URL)
发送 HTTP 请求 通过 TCP 套接字,客户端向 Web 服务器发送一个文本的请求报文,一个请求报文 由请求行、请求头部、空行和请求数据 4 部分 组成。
服务器接受请求并返回 HTTP 响应 Web 服务器解析请求,定位请求资源。服务器将资源复本写到 TCP 套接字,由客户端读取。一个响应 由状态行、响应头部、空行和响应数据 4 部分 组成。
释放连接 TCP 连接 若 connection 模式为 close,则服务器主动关闭 TCP连接,客户端被动关闭连接,释放 TCP 连接;若connection 模式为 keepalive,则该连接会保持一段时间,在该时间内可以继续接收请求;
客户端浏览器解析 HTML 内容 客户端浏览器首先解析状态行,查看表明请求是否成功的状态代码。然后解析每一个响应头,响应 头告知以下为若干字节的 HTML 文档和文档的字符集。客户端浏览器读取响应数据 HTML,根据 HTML 的语法对其进行格式化,并在浏览器窗口中显示。
例如:在浏览器地址栏键入URL ,按下回车之后会经历以下流程:
浏览器向 DNS 服务器请求解析该 URL 中的域名所对应的 IP 地址;
解析出 IP 地址后,根据该 IP 地址和默认端口 80,和服务器建立 TCP 连接;
浏览器发出读取文件( URL 中域名后面部分对应的文件)的 HTTP 请求,该请求报文作为 TCP 三次握手的第三个报文的数据发送给服务器;
服务器对浏览器请求作出响应,并把对应的 HTML 文本发送给浏览器;
释放 TCP 连接;
浏览器将该 HTML 文本并显示内容。
HTTP 协议是基于 TCP/IP 协议之上的应用层协议,基于 请求-响应 的模式。HTTP 协议规定,请求从客户端发出,最后服务器端响应该请求并返回。换句话说,肯定是先从客户端开始建立通信的,服务器端在没有接收到请求之前不会发送响应。
HTTP 请求方法 HTTP/1.1 协议中共定义了八种方法(也叫“动作”)来以不同方式操作指定的资源:
GET:向指定的资源发出“显示”请求。使用 GET 方法应该只用在读取数据,而不应当被用于产生“副作用”的操作中,例如在 Web Application 中。其中一个原因是 GET 可能会被网络蜘蛛等随意访问。
HEAD:与 GET 方法一样,都是向服务器发出指定资源的请求。只不过服务器将不传回资源的本文部分。它的好处在于,使用这个方法可以在不必传输全部内容的情况下,就可以获取其中“关于该资源的信息”(元信息或称元数据)。
POST:向指定资源提交数据,请求服务器进行处理(例如提交表单或者上传文件)。数据被包含在请求本文中。这个请求可能会创建新的资源或修改现有资源,或二者皆有。
PUT:向指定资源位置上传其最新内容。
DELETE:请求服务器删除 Request-URI 所标识的资源。
TRACE:回显服务器收到的请求,主要用于测试或诊断。
OPTIONS:这个方法可使服务器传回该资源所支持的所有 HTTP 请求方法。用’*’来代替资源名称,向 Web 服务器发送 OPTIONS 请求,可以测试服务器功能是否正常运作。
CONNECT:HTTP/1.1 协议中预留给能够将连接改为管道方式的代理服务器。通常用于SSL加密服务器的链接(经由非加密的 HTTP 代理服务器)。
HTTP状态码 所有HTTP响应的第一行都是状态行,依次是当前HTTP版本号,3位数字组成的状态代码,以及描述状态的短语,彼此由空格分隔。
状态代码的第一个数字代表当前响应的类型:
1xx消息——请求已被服务器接收,继续处理
2xx成功——请求已成功被服务器接收、理解、并接受
3xx重定向——需要后续操作才能完成这一请求
4xx请求错误——请求含有词法错误或者无法被执行
5xx服务器错误——服务器在处理某个正确请求时发生错误
虽然 RFC 2616 中已经推荐了描述状态的短语,例如”200 OK”,”404 Not Found”,但是WEB开发者仍然能够自行决定采用何种短语,用以显示本地化的状态描述或者自定义信息。
5.4 服务器编程基本框架
模块
功能
IO处理单元
处理客户连接,读写网络数据
逻辑单元
业务进程或线程
网络存储单元
数据库、文件或缓存
请求队列
各单元之间的通信方式
I/O 处理单元 是服务器管理客户连接的模块。它通常要完成以下工作:等待并接受新的客户连接,接收客户数据,将服务器响应数据返回给客户端。但是数据的收发不一定在 I/O 处理单元中执行,也可能在逻辑单元中执行,具体在何处执行取决于事件处理模式。
一个逻辑单元 通常是一个进程或线程。它分析并处理客户数据,然后将结果传递给 I/O 处理单元或者直接发送给客户端(具体使用哪种方式取决于事件处理模式)。服务器通常拥有多个逻辑单元,以实现对多个客户任务的并发处理。
网络存储单元可以是数据库、缓存和文件,但不是必须的。
请求队列 是各单元之间的通信方式的抽象。I/O 处理单元接收到客户请求时,需要以某种方式通知一个逻辑单元来处理该请求。同样,多个逻辑单元同时访问一个存储单元时,也需要采用某种机制来协调处理竞态条件。请求队列通常被实现为池的一部分。
两种高效的事件处理模式 服务器程序通常需要处理三类事件:I/O 事件、信号及定时事件 。有两种高效的事件处理模式:Reactor和 Proactor,同步 I/O 模型 通常用于实现 Reactor 模式,异步 I/O 模型 通常用于实现 Proactor 模式。
Reactor模式 (反应堆) 要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将该事件通知工作线程(逻辑单元),将 socket 可读可写事件放入请求队列,交给工作线程处理。除此之外,主线程不做任何其他实质性的工作。读写数据,接受新的连接,以及处理客户请求均在工作线程中完成。
使用同步 I/O(以 epoll_wait 为例)实现的 Reactor 模式的工作流程是:
主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
主线程调用 epoll_wait 等待 socket 上有数据可读。
当 socket 上有数据可读时, epoll_wait 通知主线程。主线程则将 socket 可读事件放入请求队列。
睡眠在请求队列上的某个工作线程被唤醒,它从 socket 读取数据,并处理客户请求,然后往 epoll内核事件表中注册该 socket 上的写就绪事件。
当主线程调用 epoll_wait 等待 socket 可写。
当 socket 可写时,epoll_wait 通知主线程。主线程将 socket 可写事件放入请求队列。
睡眠在请求队列上的某个工作线程被唤醒,它往 socket 上写入服务器处理客户请求的结果。
Reactor 模式的工作流程:
Proactor模式 Proactor 模式将所有 I/O 操作都交给主线程和内核来处理(进行读、写),工作线程仅仅负责业务逻辑。使用异步 I/O 模型(以 aio_read 和 aio_write 为例)实现的 Proactor 模式的工作流程是:
主线程调用 aio_read 函数向内核注册 socket 上的读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例)。
主线程继续处理其他逻辑。
当 socket 上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用。
应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求后,调用 aio_write 函数向内核注册 socket 上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序。
主线程继续处理其他逻辑。
当用户缓冲区的数据被写入 socket 之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕。
应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭 socket。
Proactor 模式的工作流程:
模拟Proactor模式 使用同步 I/O 方式模拟出 Proactor 模式。原理是:主线程执行数据读写操作,读写完成之后,主线程向工作线程通知这一”完成事件“。那么从工作线程的角度来看,它们就直接获得了数据读写的结果,接下来要做的只是对读写的结果进行逻辑处理。
使用同步 I/O 模型(以 epoll_wait为例)模拟出的 Proactor 模式的工作流程如下:
主线程往 epoll 内核事件表中注册 socket 上的读就绪事件。
主线程调用 epoll_wait 等待 socket 上有数据可读。
当 socket 上有数据可读时,epoll_wait 通知主线程。主线程从 socket 循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往 epoll 内核事件表中注册 socket 上的写就绪事件。
主线程调用 epoll_wait 等待 socket 可写。
当 socket 可写时,epoll_wait 通知主线程。主线程往 socket 上写入服务器处理客户请求的结果。
同步 I/O 模拟 Proactor 模式的工作流程:
5.5线程池 线程池是由服务器预先创建的一组子线程,线程池中的线程数量应该和 CPU 数量差不多。线程池中的所有子线程都运行着相同的代码。当有新的任务到来时,主线程将通过某种方式选择线程池中的某一个子线程来为之服务。相比与动态的创建子线程,选择一个已经存在的子线程的代价显然要小得多。至于主线程选择哪个子线程来为新任务服务,则有多种方式:
主线程使用某种算法来主动选择子线程。最简单、最常用的算法是随机算法和 Round Robin(轮流选取)算法,但更优秀、更智能的算法将使任务在各个工作线程中更均匀地分配,从而减轻服务器的整体压力。
主线程和所有子线程通过一个共享的工作队列来同步,子线程都睡眠在该工作队列上。当有新的任务到来时,主线程将任务添加到工作队列中。这将唤醒正在等待任务的子线程,不过只有一个子线程将获得新任务的”接管权“,它可以从工作队列中取出任务并执行之,而其他子线程将继续睡眠在工作队列上。
线程池的一般模型为:
线程池中的线程数量最直接的限制因素是中央处理器(CPU)的处理器(processors/cores)的数量N :如果你的CPU是4-cores的,对于CPU密集型的任务(如视频剪辑等消耗CPU计算资源的任务)来说,那线程池中的线程数量最好也设置为4(或者+1防止其他因素造成的线程阻塞);对于IO密集型的任务,一般要多于CPU的核数,因为线程间竞争的不是CPU的计算资源而是IO,IO的处理一般较慢,多于cores数的线程将为CPU争取更多的任务,不至在线程处理IO的过程造成CPU空闲导致资源浪费。
空间换时间,浪费服务器的硬件资源,换取运行效率。
池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源。
当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配。
当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源。
#ifndef LOCKER_H #define LOCKER_H #include <exception> #include <pthread.h> #include <semaphore.h> class locker {public : locker() { if (pthread_mutex_init(&m_mutex, NULL ) != 0 ) { throw std ::exception(); } } ~locker() { pthread_mutex_destroy(&m_mutex); } bool lock () { return pthread_mutex_lock(&m_mutex) == 0 ; } bool unlock () { return pthread_mutex_unlock(&m_mutex) == 0 ; } pthread_mutex_t *get () { return &m_mutex; } private : pthread_mutex_t m_mutex; }; class cond {public : cond(){ if (pthread_cond_init(&m_cond, NULL ) != 0 ) { throw std ::exception(); } } ~cond() { pthread_cond_destroy(&m_cond); } bool wait (pthread_mutex_t *m_mutex) { int ret = 0 ; ret = pthread_cond_wait(&m_cond, m_mutex); return ret == 0 ; } bool timewait (pthread_mutex_t *m_mutex, struct timespec t) { int ret = 0 ; ret = pthread_cond_timedwait(&m_cond, m_mutex, &t); return ret == 0 ; } bool signal () { return pthread_cond_signal(&m_cond) == 0 ; } bool broadcast () { return pthread_cond_broadcast(&m_cond) == 0 ; } private : pthread_cond_t m_cond; }; class sem {public : sem() { if ( sem_init( &m_sem, 0 , 0 ) != 0 ) { throw std ::exception(); } } sem(int num) { if ( sem_init( &m_sem, 0 , num ) != 0 ) { throw std ::exception(); } } ~sem() { sem_destroy( &m_sem ); } bool wait () { return sem_wait( &m_sem ) == 0 ; } bool post () { return sem_post( &m_sem ) == 0 ; } private : sem_t m_sem; }; #endif
#ifndef THREADPOOL_H #define THREADPOOL_H #include <list> #include <cstdio> #include <exception> #include <pthread.h> #include "locker.h" template <typename T>class threadpool {public : threadpool(int thread_number = 8 , int max_requests = 10000 ); ~threadpool(); bool append (T* request) ; private : static void * worker (void * arg) ; void run () ; private : int m_thread_number; pthread_t * m_threads; int m_max_requests; std ::list < T* > m_workqueue; locker m_queuelocker; sem m_queuestat; bool m_stop; }; template < typename T >threadpool< T >::threadpool(int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests), m_stop(false ), m_threads(NULL ) { if ((thread_number <= 0 ) || (max_requests <= 0 ) ) { throw std ::exception(); } m_threads = new pthread_t [m_thread_number]; if (!m_threads) { throw std ::exception(); } for ( int i = 0 ; i < thread_number; ++i ) { printf ( "create the %dth thread\n" , i); if (pthread_create(m_threads + i, NULL , worker, this ) != 0 ) { delete [] m_threads; throw std ::exception(); } if ( pthread_detach( m_threads[i] ) ) { delete [] m_threads; throw std ::exception(); } } } template < typename T >threadpool< T >::~threadpool() { delete [] m_threads; m_stop = true ; } template < typename T >bool threadpool< T >::append( T* request ){ m_queuelocker.lock(); if ( m_workqueue.size() > m_max_requests ) { m_queuelocker.unlock(); return false ; } m_workqueue.push_back(request); m_queuelocker.unlock(); m_queuestat.post(); return true ; } template < typename T >void * threadpool< T >::worker( void * arg ){ threadpool* pool = ( threadpool* )arg; pool->run(); return pool; } template < typename T >void threadpool< T >::run() { while (!m_stop) { m_queuestat.wait(); m_queuelocker.lock(); if ( m_workqueue.empty() ) { m_queuelocker.unlock(); continue ; } T* request = m_workqueue.front(); m_workqueue.pop_front(); m_queuelocker.unlock(); if ( !request ) { continue ; } request->process(); } } #endif
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include "locker.h" #include "threadpool.h" #include "http_conn.h" #define MAX_FD 65536 #define MAX_EVENT_NUMBER 10000 extern void addfd ( int epollfd, int fd, bool one_shot ) ;extern void removefd ( int epollfd, int fd ) ;void addsig (int sig, void ( handler )(int )) { struct sigaction sa ; memset ( &sa, '\0' , sizeof ( sa ) ); sa.sa_handler = handler; sigfillset( &sa.sa_mask ); assert( sigaction( sig, &sa, NULL ) != -1 ); } int main ( int argc, char * argv[] ) { if ( argc <= 1 ) { printf ( "usage: %s port_number\n" , basename(argv[0 ])); return 1 ; } int port = atoi( argv[1 ] ); addsig( SIGPIPE, SIG_IGN ); threadpool< http_conn >* pool = NULL ; try { pool = new threadpool<http_conn>; } catch ( ... ) { return 1 ; } http_conn* users = new http_conn[ MAX_FD ]; int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); int ret = 0 ; struct sockaddr_in address ; address.sin_addr.s_addr = INADDR_ANY; address.sin_family = AF_INET; address.sin_port = htons( port ); int reuse = 1 ; setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof ( reuse ) ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof ( address ) ); ret = listen( listenfd, 5 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); addfd( epollfd, listenfd, false ); http_conn::m_epollfd = epollfd; while (true ) { int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ( number < 0 ) && ( errno != EINTR ) ) { printf ( "epoll failure\n" ); break ; } for ( int i = 0 ; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address ; socklen_t client_addrlength = sizeof ( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); if ( connfd < 0 ) { printf ( "errno is: %d\n" , errno ); continue ; } if ( http_conn::m_user_count >= MAX_FD ) { close(connfd); continue ; } users[connfd].init( connfd, client_address); } else if ( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) ) { users[sockfd].close_conn(); } else if (events[i].events & EPOLLIN) { if (users[sockfd].read()) { pool->append(users + sockfd); } else { users[sockfd].close_conn(); } } else if ( events[i].events & EPOLLOUT ) { if ( !users[sockfd].write() ) { users[sockfd].close_conn(); } } } } close( epollfd ); close( listenfd ); delete [] users; delete pool; return 0 ; }
#ifndef HTTPCONNECTION_H #define HTTPCONNECTION_H #include <unistd.h> #include <signal.h> #include <sys/types.h> #include <sys/epoll.h> #include <fcntl.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <sys/stat.h> #include <string.h> #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <sys/mman.h> #include <stdarg.h> #include <errno.h> #include "locker.h" #include <sys/uio.h> class http_conn { public : static const int FILENAME_LEN = 200 ; static const int READ_BUFFER_SIZE = 2048 ; static const int WRITE_BUFFER_SIZE = 1024 ; enum METHOD { GET = 0 , POST, HEAD, PUT, DELETE, TRACE, OPTIONS, CONNECT}; enum CHECK_STATE { CHECK_STATE_REQUESTLINE = 0 , CHECK_STATE_HEADER, CHECK_STATE_CONTENT }; enum HTTP_CODE { NO_REQUEST, GET_REQUEST, BAD_REQUEST, NO_RESOURCE, FORBIDDEN_REQUEST, FILE_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION }; enum LINE_STATUS { LINE_OK = 0 , LINE_BAD, LINE_OPEN }; public : http_conn(){} ~http_conn(){} public : void init (int sockfd, const sockaddr_in& addr) ; void close_conn () ; void process () ; bool read () ; bool write () ; private : void init () ; HTTP_CODE process_read () ; bool process_write ( HTTP_CODE ret ) ; HTTP_CODE parse_request_line ( char * text ) ; HTTP_CODE parse_headers ( char * text ) ; HTTP_CODE parse_content ( char * text ) ; HTTP_CODE do_request () ; char * get_line () { return m_read_buf + m_start_line; } LINE_STATUS parse_line () ; void unmap () ; bool add_response ( const char * format, ... ) ; bool add_content ( const char * content ) ; bool add_content_type () ; bool add_status_line ( int status, const char * title ) ; bool add_headers ( int content_length ) ; bool add_content_length ( int content_length ) ; bool add_linger () ; bool add_blank_line () ; public : static int m_epollfd; static int m_user_count; private : int m_sockfd; sockaddr_in m_address; char m_read_buf[ READ_BUFFER_SIZE ]; int m_read_idx; int m_checked_idx; int m_start_line; CHECK_STATE m_check_state; METHOD m_method; char m_real_file[ FILENAME_LEN ]; char * m_url; char * m_version; char * m_host; int m_content_length; bool m_linger; char m_write_buf[ WRITE_BUFFER_SIZE ]; int m_write_idx; char * m_file_address; struct stat m_file_stat ; struct iovec m_iv [2]; int m_iv_count; int bytes_to_send; int bytes_have_send; }; #endif
#include "http_conn.h" const char * ok_200_title = "OK" ;const char * error_400_title = "Bad Request" ;const char * error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n" ;const char * error_403_title = "Forbidden" ;const char * error_403_form = "You do not have permission to get file from this server.\n" ;const char * error_404_title = "Not Found" ;const char * error_404_form = "The requested file was not found on this server.\n" ;const char * error_500_title = "Internal Error" ;const char * error_500_form = "There was an unusual problem serving the requested file.\n" ;const char * doc_root = "/home/nowcoder/webserver/resources" ;int setnonblocking ( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd ( int epollfd, int fd, bool one_shot ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLRDHUP; if (one_shot) { event.events |= EPOLLONESHOT; } epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void removefd ( int epollfd, int fd ) { epoll_ctl( epollfd, EPOLL_CTL_DEL, fd, 0 ); close(fd); } void modfd (int epollfd, int fd, int ev) { epoll_event event; event.data.fd = fd; event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP; epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event ); } int http_conn::m_user_count = 0 ;int http_conn::m_epollfd = -1 ;void http_conn::close_conn () { if (m_sockfd != -1 ) { removefd(m_epollfd, m_sockfd); m_sockfd = -1 ; m_user_count--; } } void http_conn::init (int sockfd, const sockaddr_in& addr) { m_sockfd = sockfd; m_address = addr; int reuse = 1 ; setsockopt( m_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof ( reuse ) ); addfd( m_epollfd, sockfd, true ); m_user_count++; init(); } void http_conn::init () { bytes_to_send = 0 ; bytes_have_send = 0 ; m_check_state = CHECK_STATE_REQUESTLINE; m_linger = false ; m_method = GET; m_url = 0 ; m_version = 0 ; m_content_length = 0 ; m_host = 0 ; m_start_line = 0 ; m_checked_idx = 0 ; m_read_idx = 0 ; m_write_idx = 0 ; bzero(m_read_buf, READ_BUFFER_SIZE); bzero(m_write_buf, READ_BUFFER_SIZE); bzero(m_real_file, FILENAME_LEN); } bool http_conn::read () { if ( m_read_idx >= READ_BUFFER_SIZE ) { return false ; } int bytes_read = 0 ; while (true ) { bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0 ); if (bytes_read == -1 ) { if ( errno == EAGAIN || errno == EWOULDBLOCK ) { break ; } return false ; } else if (bytes_read == 0 ) { return false ; } m_read_idx += bytes_read; } return true ; } http_conn::LINE_STATUS http_conn::parse_line () { char temp; for ( ; m_checked_idx < m_read_idx; ++m_checked_idx ) { temp = m_read_buf[ m_checked_idx ]; if ( temp == '\r' ) { if ( ( m_checked_idx + 1 ) == m_read_idx ) { return LINE_OPEN; } else if ( m_read_buf[ m_checked_idx + 1 ] == '\n' ) { m_read_buf[ m_checked_idx++ ] = '\0' ; m_read_buf[ m_checked_idx++ ] = '\0' ; return LINE_OK; } return LINE_BAD; } else if ( temp == '\n' ) { if ( ( m_checked_idx > 1 ) && ( m_read_buf[ m_checked_idx - 1 ] == '\r' ) ) { m_read_buf[ m_checked_idx-1 ] = '\0' ; m_read_buf[ m_checked_idx++ ] = '\0' ; return LINE_OK; } return LINE_BAD; } } return LINE_OPEN; } http_conn::HTTP_CODE http_conn::parse_request_line (char * text) { m_url = strpbrk (text, " \t" ); if (! m_url) { return BAD_REQUEST; } *m_url++ = '\0' ; char * method = text; if ( strcasecmp(method, "GET" ) == 0 ) { m_method = GET; } else { return BAD_REQUEST; } m_version = strpbrk ( m_url, " \t" ); if (!m_version) { return BAD_REQUEST; } *m_version++ = '\0' ; if (strcasecmp( m_version, "HTTP/1.1" ) != 0 ) { return BAD_REQUEST; } if (strncasecmp(m_url, "http://" , 7 ) == 0 ) { m_url += 7 ; m_url = strchr ( m_url, '/' ); } if ( !m_url || m_url[0 ] != '/' ) { return BAD_REQUEST; } m_check_state = CHECK_STATE_HEADER; return NO_REQUEST; } http_conn::HTTP_CODE http_conn::parse_headers (char * text) { if ( text[0 ] == '\0' ) { if ( m_content_length != 0 ) { m_check_state = CHECK_STATE_CONTENT; return NO_REQUEST; } return GET_REQUEST; } else if ( strncasecmp( text, "Connection:" , 11 ) == 0 ) { text += 11 ; text += strspn ( text, " \t" ); if ( strcasecmp( text, "keep-alive" ) == 0 ) { m_linger = true ; } } else if ( strncasecmp( text, "Content-Length:" , 15 ) == 0 ) { text += 15 ; text += strspn ( text, " \t" ); m_content_length = atol(text); } else if ( strncasecmp( text, "Host:" , 5 ) == 0 ) { text += 5 ; text += strspn ( text, " \t" ); m_host = text; } else { printf ( "oop! unknow header %s\n" , text ); } return NO_REQUEST; } http_conn::HTTP_CODE http_conn::parse_content ( char * text ) { if ( m_read_idx >= ( m_content_length + m_checked_idx ) ) { text[ m_content_length ] = '\0' ; return GET_REQUEST; } return NO_REQUEST; } http_conn::HTTP_CODE http_conn::process_read () { LINE_STATUS line_status = LINE_OK; HTTP_CODE ret = NO_REQUEST; char * text = 0 ; while (((m_check_state == CHECK_STATE_CONTENT) && (line_status == LINE_OK)) || ((line_status = parse_line()) == LINE_OK)) { text = get_line(); m_start_line = m_checked_idx; printf ( "got 1 http line: %s\n" , text ); switch ( m_check_state ) { case CHECK_STATE_REQUESTLINE: { ret = parse_request_line( text ); if ( ret == BAD_REQUEST ) { return BAD_REQUEST; } break ; } case CHECK_STATE_HEADER: { ret = parse_headers( text ); if ( ret == BAD_REQUEST ) { return BAD_REQUEST; } else if ( ret == GET_REQUEST ) { return do_request(); } break ; } case CHECK_STATE_CONTENT: { ret = parse_content( text ); if ( ret == GET_REQUEST ) { return do_request(); } line_status = LINE_OPEN; break ; } default : { return INTERNAL_ERROR; } } } return NO_REQUEST; } http_conn::HTTP_CODE http_conn::do_request () { strcpy ( m_real_file, doc_root ); int len = strlen ( doc_root ); strncpy ( m_real_file + len, m_url, FILENAME_LEN - len - 1 ); if ( stat( m_real_file, &m_file_stat ) < 0 ) { return NO_RESOURCE; } if ( ! ( m_file_stat.st_mode & S_IROTH ) ) { return FORBIDDEN_REQUEST; } if ( S_ISDIR( m_file_stat.st_mode ) ) { return BAD_REQUEST; } int fd = open( m_real_file, O_RDONLY ); m_file_address = ( char * )mmap( 0 , m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0 ); close( fd ); return FILE_REQUEST; } void http_conn::unmap () { if ( m_file_address ) { munmap( m_file_address, m_file_stat.st_size ); m_file_address = 0 ; } } bool http_conn::write () { int temp = 0 ; if ( bytes_to_send == 0 ) { modfd( m_epollfd, m_sockfd, EPOLLIN ); init(); return true ; } while (1 ) { temp = writev(m_sockfd, m_iv, m_iv_count); if ( temp <= -1 ) { if ( errno == EAGAIN ) { modfd( m_epollfd, m_sockfd, EPOLLOUT ); return true ; } unmap(); return false ; } bytes_have_send += temp; bytes_to_send -= temp; if (bytes_have_send >= m_iv[0 ].iov_len) { m_iv[0 ].iov_len = 0 ; m_iv[1 ].iov_base = m_file_address + (bytes_have_send - m_write_idx); m_iv[1 ].iov_len = bytes_to_send; } else { m_iv[0 ].iov_base = m_write_buf + bytes_have_send; m_iv[0 ].iov_len = m_iv[0 ].iov_len - temp; } if (bytes_to_send <= 0 ) { unmap(); modfd(m_epollfd, m_sockfd, EPOLLIN); if (m_linger) { init(); return true ; } else { return false ; } } } } bool http_conn::add_response ( const char * format, ... ) { if ( m_write_idx >= WRITE_BUFFER_SIZE ) { return false ; } va_list arg_list; va_start( arg_list, format ); int len = vsnprintf( m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list ); if ( len >= ( WRITE_BUFFER_SIZE - 1 - m_write_idx ) ) { return false ; } m_write_idx += len; va_end( arg_list ); return true ; } bool http_conn::add_status_line ( int status, const char * title ) { return add_response( "%s %d %s\r\n" , "HTTP/1.1" , status, title ); } bool http_conn::add_headers (int content_len) { add_content_length(content_len); add_content_type(); add_linger(); add_blank_line(); } bool http_conn::add_content_length (int content_len) { return add_response( "Content-Length: %d\r\n" , content_len ); } bool http_conn::add_linger () { return add_response( "Connection: %s\r\n" , ( m_linger == true ) ? "keep-alive" : "close" ); } bool http_conn::add_blank_line () { return add_response( "%s" , "\r\n" ); } bool http_conn::add_content ( const char * content ) { return add_response( "%s" , content ); } bool http_conn::add_content_type () { return add_response("Content-Type:%s\r\n" , "text/html" ); } bool http_conn::process_write (HTTP_CODE ret) { switch (ret) { case INTERNAL_ERROR: add_status_line( 500 , error_500_title ); add_headers( strlen ( error_500_form ) ); if ( ! add_content( error_500_form ) ) { return false ; } break ; case BAD_REQUEST: add_status_line( 400 , error_400_title ); add_headers( strlen ( error_400_form ) ); if ( ! add_content( error_400_form ) ) { return false ; } break ; case NO_RESOURCE: add_status_line( 404 , error_404_title ); add_headers( strlen ( error_404_form ) ); if ( ! add_content( error_404_form ) ) { return false ; } break ; case FORBIDDEN_REQUEST: add_status_line( 403 , error_403_title ); add_headers(strlen ( error_403_form)); if ( ! add_content( error_403_form ) ) { return false ; } break ; case FILE_REQUEST: add_status_line(200 , ok_200_title ); add_headers(m_file_stat.st_size); m_iv[ 0 ].iov_base = m_write_buf; m_iv[ 0 ].iov_len = m_write_idx; m_iv[ 1 ].iov_base = m_file_address; m_iv[ 1 ].iov_len = m_file_stat.st_size; m_iv_count = 2 ; bytes_to_send = m_write_idx + m_file_stat.st_size; return true ; default : return false ; } m_iv[ 0 ].iov_base = m_write_buf; m_iv[ 0 ].iov_len = m_write_idx; m_iv_count = 1 ; bytes_to_send = m_write_idx; return true ; } void http_conn::process () { HTTP_CODE read_ret = process_read(); if ( read_ret == NO_REQUEST ) { modfd( m_epollfd, m_sockfd, EPOLLIN ); return ; } bool write_ret = process_write( read_ret ); if ( !write_ret ) { close_conn(); } modfd( m_epollfd, m_sockfd, EPOLLOUT); }
有限状态机 逻辑单元内部的一种高效编程方法:有限状态机(finite state machine)。
有的应用层协议头部包含数据包类型字段,每种类型可以映射为逻辑单元的一种执行状态,服务器可以根据它来编写相应的处理逻辑。如下是一种状态独立的有限状态机:
STATE_MACHINE( Package _pack ) { PackageType _type = _pack.GetType(); switch ( _type ) { case type_A: process_package_A( _pack ); break ; case type_B: process_package_B( _pack ); break ; } }
这是一个简单的有限状态机,只不过该状态机的每个状态都是相互独立的,即状态之间没有相互转移。状态之间的转移是需要状态机内部驱动,如下代码:
STATE_MACHINE() { State cur_State = type_A; while ( cur_State != type_C ) { Package _pack = getNewPackage(); switch ( cur_State ) { case type_A: process_package_state_A( _pack ); cur_State = type_B; break ; case type_B: process_package_state_B( _pack ); cur_State = type_C; break ; } } }
该状态机包含三种状态:type_A、type_B 和 type_C,其中 type_A 是状态机的开始状态,type_C 是状态机的结束状态。状态机的当前状态记录在 cur_State 变量中。在一趟循环过程中,状态机先通过getNewPackage 方法获得一个新的数据包,然后根据 cur_State 变量的值判断如何处理该数据包。数据包处理完之后,状态机通过给 cur_State 变量传递目标状态值来实现状态转移。那么当状态机进入下一趟循环时,它将执行新的状态对应的逻辑。
EPOLLONESHOT事件 即使可以使用 ET 模式,一个socket 上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程在读取完某个 socket 上的数据后开始处理这些数据,而在数据的处理过程中该socket 上又有新数据可读(EPOLLIN 再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个 socket 的局面。一个socket连接在任一时刻都只被一个线程处理,可以使用 epoll 的 EPOLLONESHOT 事件实现。
对于注册了 EPOLLONESHOT 事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用 epoll_ctl 函数重置该文件描述符上注册的 EPOLLONESHOT 事件。这样,当一个线程在处理某个 socket 时,其他线程是不可能有机会操作该 socket 的。但反过来思考,注册了 EPOLLONESHOT 事件的 socket 一旦被某个线程处理完毕, 该线程就应该立即重置这个socket 上的 EPOLLONESHOT 事件,以确保这个 socket 下一次可读时,其 EPOLLIN 事件能被触发,进而让其他工作线程有机会继续处理这个 socket。