[原]I/O复用之 EPOLLONESHOT 事件

王良 18/02/03 19:23:31

EPOLLONESHOT 事件



1. 使用EPOLLONESHOT的原因及优点

即使使用ET模式,一个socket上的某个事件还是可能被触发多次。比如:一个线程在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒用来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。而我们希望一个socket连接在任一时刻都只能被一个线程处理,这就可以通过EPOLLONESHOT事件实现。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,在一个线程使用socket时,其他线程无法操作socket。同样,只有在该socket被处理完后,须立即重置该socket的EPOLLONESHOT事件,以确保这个socket在下次可读时,其EPOLLIN事件能够被触发,进而让其他线程有机会操作这个socket。


2. recv返回值 及 与errno的配合使用

在编写程序时,发现对recv函数返回值意义仍然不清,导致部分代码含义没看懂,在此特别记录一下,以下用ret表示recv函数的返回值。

(1) ret == 0

表示数据已经读完。当客户端不与服务端交互数据好长时间之后,服务端程序会自动断开连接,同时客户端的连接状态变成了 CLOSE_WAIT,如果客户端再向服务端发送数据,然后recv服务端的反馈时,就会造成recv返回0。

(2) ret > 0
表示接收到数据。ret值为接收数据的字节数。

(3) ret < 0
表示recv函数出错。此时分为多种情况,仅介绍以下情况:

  • errno == EAGAIN 表示数据未读完。数据量太大,一次发送导致缓冲区满,需要再次检查是否还有未读取数据。

在此引用其他大佬的思考:
当对侧没有send,即本侧的套接字s的接收缓冲区无数据,返回值是?(EAGAIN,原因为超时,待测)


3. 示例程序

/* EPOLLONESHOT 事件的使用
** 运行命令: g++ filename.cpp -lpthread; ./a.out 127.0.0.1 6666
** 可以使用telnet连接该服务器(telnet 127.0.0.1 6666)
*/


#include <iostream>
#include <cstdio>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/sendfile.h>
#include <unistd.h>
#include <signal.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstdlib>
#include <cstring>
#include <fcntl.h>
#include <sys/epoll.h>
#include <pthread.h>
using namespace std;

const int MAX_EVENT_NUMBER = 1024;
const int BUF_SIZE = 1024;

struct fds{
    int epoll_fd;
    int sock_fd;
};

void err( int line ) {
    cout << "error_line: " << line << endl;
}

int setnonblocking( int fd ) {
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epoll_fd, int fd, bool oneshot ) {
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if ( oneshot ) {
        event.events |= EPOLLONESHOT;
    }
    epoll_ctl( epoll_fd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void reset_oneshot( int epoll_fd, int fd ) {
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epoll_fd, EPOLL_CTL_MOD, fd, &event );
}

void * worker( void * arg ) {
    int sock_fd = ( (fds *)arg )->sock_fd;
    int epoll_fd = ( (fds *)arg )->epoll_fd;
    printf("start new thread to receive data on fd: %d\n", sock_fd);
    char buf[BUF_SIZE];
    memset( buf, 0, sizeof( buf ) );

    while( true ) {
        int ret = recv( sock_fd, buf, BUF_SIZE - 1, 0 );
        if ( !ret ) {
            close( sock_fd );
            cout << "foreiner closed the connection\n";
            break;
        } else if ( ret < 0 ) {
            if ( errno == EAGAIN ) {  //数据未读完,需要再次读取
                reset_oneshot( epoll_fd, sock_fd );
                cout << "read later\n";
                break;
            }
        } else {
            printf( "got %d bytes of data: %s\n", ret, buf );
            sleep(5);
        }
    }
    printf("end thread receiving data on fd: %d\n", sock_fd);
}

int main( int argc, char * argv[] ) {
    if ( argc < 3 ) {
        printf( "usage: ./file ip_number port_number\n" );
        return 1;
    }

    const char * ip = argv[1];
    const int port = atoi( argv[2] );

    struct sockaddr_in address;
    memset( &address, 0, sizeof( address ) );
    address.sin_family = AF_INET;
    address.sin_port = htons( port );
    inet_pton( AF_INET, ip, &address.sin_addr );

    int sock_fd = socket( AF_INET, SOCK_STREAM, 0 );
    if ( sock_fd < 0 ) {
        err( __LINE__ );
    }

    int ret = bind( sock_fd, ( struct sockaddr * )&address, sizeof( address ) );
    if ( sock_fd < 0 ) {
        err( __LINE__ );
    }

    ret = listen( sock_fd, 5 );
    if ( sock_fd < 0 ) {
        err( __LINE__ );
    }

    epoll_event events[MAX_EVENT_NUMBER];
    int epoll_fd = epoll_create( 5 );
    if ( epoll_fd < 0 ) {
        err( __LINE__ );
    }

    /* 监听socket的 sock_fd 不能注册 EPOLLONESHOT 事件,否则程序只能处理一个客户连接
    ** 因为后续的客户连接请求将不再触发sock_fd的 EPOLLIN 事件
     */
    addfd( epoll_fd, sock_fd, false );

    while( true ) {
        ret = epoll_wait( epoll_fd, events, MAX_EVENT_NUMBER, -1 );  //等待事件发生
        if ( ret < 0 ) {
            printf( "epoll failure\n" );
            break;
        }

        for ( int i = 0; i < ret; i++ ) {
            int fd = events[i].data.fd;
            if ( fd == sock_fd ) {  //有新的连接请求
                struct sockaddr_in client;
                socklen_t client_length = sizeof( client );
                int conn_fd = accept( sock_fd, ( struct sockaddr * )&client,
                                        &client_length );

                //对每个非监听文件描述符都注册 EPOLLONESHOT 事件
                //添加的是刚accept的fd
                addfd( epoll_fd, conn_fd, true );
            } else if ( events[i].events & EPOLLIN ) {  //有可读取数据
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epoll_fd = epoll_fd;
                fds_for_new_worker.sock_fd = fd;  //内核事件表中的fd,不要搞混

                //新启动一个线程为sock_fd服务
                pthread_create( &thread, NULL, worker, (void *)&fds_for_new_worker );
            } else {
                printf( "something else happened\n" );
            }
        }
    }

    close(sock_fd);

    return 0;
}
作者:liushall 发表于 2018/02/03 19:23:31 原文链接 http://blog.csdn.net/liushall/article/details/79248879
阅读:20