觉得写的比较好的libevent事件模型 发布日期:2016-11-30 17:13:23     博主推荐★

就直接上代码来。 头文件:libevent_srv.h;

    ifndef __LIBEVENT_TST  

    #define __LIBEVENT_TST  

      

    #define PORT 8800  

    #define BACKLOG 5  

    #define MAX_RTSP_BUFFER 2048  

      

    struct sock_ev {  

        struct event* read_ev;  

        struct event* write_ev;  

        struct event* timer_ev;  

        char* buffer;  

    // record client info     

        char ipDocDecimal[INET_ADDRSTRLEN];  

        unsigned short port;  

    };  

      

    int  GetSessionID();  

    void on_accept(int sock, short event, void* arg);  

    void on_write(int sock, short event, void* arg);  

    void on_read(int sock, short event, void* arg);  

    void Timer_CB(int fd, short event , void *arg);  

    void release_sock_event(struct sock_ev* ev);  

      

    #endif  

代码体: libevent_srv.c

    #include <stdio.h>        // printf 等声明  

    #include <stdlib.h>       // malloc, free 声明  

    #include <string.h>       // memset, strlen 等声明  

    #include <sys/socket.h>   // socket 等声明  

    #include <netinet/in.h>   // struct sockeadd 等声明  

    #include <event.h>        // struct event, event_base 等  

    #include "libevent_srv.h"  

      

      

    //gBase 是基本事件管理员。 对所有连接仅此一个.  

    //因在多个函数中出现,故声明为全局变量  

    struct event_base* gBase;  

    int main(int argc, char* argv[])  

    {  

        int sock = socket(AF_INET, SOCK_STREAM, 0);  

      

        //  设置了该选项后,在父子进程模型中,当子进程为客户服务的时候如果父进程退出,可以重新启动程序完成服务的无缝升级,  

        //  否则在所有父子进程完全退出前再启动程序会在该端口上绑定失败,也即不能完成无缝升级.  

        //  SO_REUSEADDR, 地址可以重用,不必time_wait 2个MSL 时间  (maximum segment lifetime)  

        int yes = 1;  

        setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));  

      

        struct sockaddr_in serv_addr;  

        memset(&serv_addr, 0, sizeof(serv_addr));  

        serv_addr.sin_family = AF_INET;  

        serv_addr.sin_port = htons(PORT);  

        serv_addr.sin_addr.s_addr = INADDR_ANY;  

        bind(sock, (struct sockaddr*)&serv_addr, sizeof(struct sockaddr));  

      

        printf("socket listen now\n");  

        listen(sock, BACKLOG);  

      

        //创建一个事件处理的全局变量,可以理解为这是一个负责集中处理各种出入IO事件的总管家,  

        //它负责接收和派发所有输入输出IO事件的信息,这里调用的是函数event_base_new(), 很多程序里这里用的是event_init(),  

        //但event_init 是一个过时的接口,官方建议采用event_base_new()  

        gBase = event_base_new();  

        struct event listen_ev;  

        //listen_en这个事件监听sock这个描述字的读操作,当读消息到达时调用on_accept函数,  

        //EV_PERSIST参数告诉系统持续的监听sock上的读事件,如果不加该参数,每次要监听该事件时就要重复的调用event_add函数,  

        //sock这个描述符是bind到本地的socket端口上,因此其对应的可读事件自然就是来自客户端的连接到达  

        event_set(&listen_ev, sock, EV_READ|EV_PERSIST, on_accept, NULL);  

        //告诉处理IO的管家请留意我们的listen_ev上的事件  

        event_base_set(gBase, &listen_ev);  

        //当有关心的事件到达时,调用on_accept函数  

        event_add(&listen_ev, NULL);  

        //正式启动libevent的事件处理机制,使系统运行起来,  

        //event_base_dispatch是一个无限循环 , 跟windows 编程的事件处理机制还真是相似啊。  

        //通过此例测试可知,dispatch 是个单线程模型,不能够并发工作,当一个事件被处理时,下一个事件不会被同时处理。  

        //当一个事件被长时间处理时,后面的事件会被堆积。总之,事件被顺序的一个一个执行。  

        event_base_dispatch(gBase);  

      

        return 0;  

    }  

    void on_accept(int sock, short event, void* arg)  

    {  

        int sid = GetSessionID();  

        printf("session id:%d\n",sid);  

    //  socket的描述字可以封装一个结构体sock_ev 来保护读、写的事件以及数据缓冲区  

    //  多个客户端因而对应多个newfd, 并对应多个sock_ev 变量  

        struct sock_ev* ev = (struct sock_ev*)malloc(sizeof(struct sock_ev));  

        ev->read_ev = (struct event*)malloc(sizeof(struct event));  

        ev->write_ev = (struct event*)malloc(sizeof(struct event));  

        ev->timer_ev = (struct event*)malloc(sizeof(struct event));  

        ev->buffer = (char*)malloc(MAX_RTSP_BUFFER);  

      

        int sin_size = sizeof(struct sockaddr_in);  

        struct sockaddr_in cli_addr;  

        int newfd = accept(sock, (struct sockaddr*)&cli_addr, &sin_size);  

        char IPDotDecimal[INET_ADDRSTRLEN];  

        struct in_addr addr=cli_addr.sin_addr;  

        inet_ntop(AF_INET,&addr,IPDotDecimal,sizeof(IPDotDecimal));  

        strcpy(ev->ipDocDecimal, IPDotDecimal);  

        ev->port = cli_addr.sin_port;  

        printf("ip:%s, port:%d\n",IPDotDecimal, cli_addr.sin_port);  

      

        //在客户描述字newfd上监听可读事件,当有数据到达是调用on_read函数。read_ev作为参数传递给了on_read函数。  

        //注意:read_ev需要从堆里malloc出来,如果是在栈上分配,那么当函数返回时变量占用的内存会被释放,  

        //此时事件主循环event_base_dispatch会访问无效的内存而导致进程崩溃(即crash);  

        event_set(ev->read_ev, newfd, EV_READ|EV_PERSIST, on_read, ev);  

        event_base_set(gBase, ev->read_ev);  

        event_add(ev->read_ev, NULL);  

        // 增加一个定时器:  

        evtimer_set(ev->timer_ev, Timer_CB, ev);  

        event_base_set(gBase, ev->timer_ev);  

        struct timeval timer_v;   

        timer_v.tv_sec = 2;   

        timer_v.tv_usec = 0;  

        evtimer_add(ev->timer_ev, &timer_v);  

    }  

      

    void on_write(int sock, short event, void* arg)  

    {  

        char* buffer = (char*)arg;  

        send(sock, buffer, strlen(buffer), 0);  

    }  

      

    void on_read(int sock, short event, void* arg)  

    {  

        struct event* write_ev;  

        int size;  

        struct sock_ev* ev = (struct sock_ev*)arg;  

        bzero(ev->buffer, MAX_RTSP_BUFFER);  

        size = recv(sock, ev->buffer, MAX_RTSP_BUFFER, 0);  

            printf("---receive data:---, size:%d\n", size);  

            printf("%s\n", ev->buffer);  

       if (size == 0) {  

        //已经关闭了连接, 释放资源  

            release_sock_event(ev);  

            close(sock);  

            return;  

        }  

    //  在on_read函数中从socket读取数据后程序就可以直接调用write/send接口向客户回写数据了,  

    //  直接调用write/send函数向客户端写数据可能导致程序较长时间阻塞在IO操作上,  

    //  比如socket的输出缓冲区已满,则write/send操作阻塞到有可用的缓冲区之后才能进行实际的写操作,  

    //  而通过注册on_write函数,那么libevent会在合适的时间调用我们的callback函数  

        //在sock 注册 on_write 函数  

        event_set(ev->write_ev, sock, EV_WRITE, on_write, ev->buffer);  

        event_base_set(gBase, ev->write_ev);  

        event_add(ev->write_ev, NULL);  

    }  

     void Timer_CB(int fd, short event , void *arg)  

    {  

        struct sock_ev * ev = (struct sock_ev *) arg;  

        printf("Timer_CB called sleep 2s,emulate a long process\n");  

        sleep(2);  

        struct timeval timer_v;   

        timer_v.tv_sec = 2;   

        timer_v.tv_usec = 0;  

        evtimer_add(ev->timer_ev, &timer_v);  

    }  

      

    void release_sock_event(struct sock_ev* ev)  

    {  

        printf("socket resource released:%s %d\n",ev->ipDocDecimal,ev->port);  

        event_del(ev->read_ev);  

        free(ev->read_ev);  

        free(ev->write_ev);  

        free(ev->timer_ev);  

        free(ev->buffer);  

        free(ev);  

    }  

    int  GetSessionID()  

    {  

        static int sessionID = 1;   

        // 对于成千上万的连接请求,如果有并发存在,需要枷锁保护sessionID 的数据更新  

        // 但libevent 是单线程模型,所以不用加锁。  

        return sessionID++;         

    }  


为了代码的完整性,也贴上socket 客户端程序 client_test.c

    #include <stdio.h>  

    #include <stdlib.h>  

    #include <string.h>  

    #include <sys/socket.h>  

    #include <netinet/in.h>  

      

    #define MAXLINE 256  

    #define SERV_PORT 8800  

      

    void do_loop(FILE *fp, int sockfd)  

    {  

        int n;  

        char sendline[MAXLINE], recvline[MAXLINE + 1];  

      

        while(fgets(sendline, MAXLINE, fp) != NULL)  

        {  

            /* read a line and send to server */  

            write(sockfd, sendline, strlen(sendline));  

            /* receive data from server */  

            n = read(sockfd, recvline, MAXLINE);  

            if(n == -1)  

            {  

                perror("read error");  

                exit(1);  

            }  

            recvline[n] = 0; /* terminate string */  

            printf("\nreceiv:%s\n",recvline);  

        }  

    }  

      

    int main(int argc, char **argv)  

    {  

        int sockfd;  

        struct sockaddr_in servaddr;  

      

        /* check args */  

        if(argc != 2)  

        {  

            printf("usage: udpclient <IPaddress>\n");  

            exit(1);  

        }  

      

        /* init servaddr */  

        bzero(&servaddr, sizeof(servaddr));  

        servaddr.sin_family = AF_INET;  

        servaddr.sin_port = htons(SERV_PORT);  

        if(inet_pton(AF_INET, argv[1], &servaddr.sin_addr) <= 0)  

        {  

            printf("[%s] is not a valid IPaddress\n", argv[1]);  

            exit(1);  

        }  

      

    //  sockfd = socket(AF_INET, SOCK_DGRAM, 0);  

        sockfd = socket(AF_INET, SOCK_STREAM, 0);  

      

        /* connect to server */  

        if(connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1)  

        {  

            perror("connect error");  

            exit(1);  

        }  

        do_loop(stdin, sockfd);  

      

        return 0;  

    }  


博文地址:https://blog.ahamu.cn/blog/detail.html?id=82
   
推荐文章
  • 1
    phpStorm中使用xdebug工具调试dock
    2019/12/09
  • 2
    讲的比较好的B+树执行原理的文章
    2019/12/09
  • 3
    如何用Dockerfile构建镜像
    2019/12/09
  • 4
    Linux服务器内存消耗过高
    2019/10/24
  • 5
    解决现有问题的方式方法
    2019/10/15
  • 6
    PHP对象的深克隆方法
    2019/10/12
  • 7
    ES演示文稿【二】
    2019/09/23
  • 8
    ES演示文稿
    2019/09/23
  • 9
    kibana正则表达式demo
    2019/09/23
  • 10
    聊天有序集合
    2019/09/23
  • 11
    分销员授权登陆逻辑思维导图
    2019/09/18
  • 12
    logstash配置记录
    2019/09/02
  • 13
    比较好用的过滤表情的正则
    2019/08/13
  • 14
    Linux中的screen命令使用
    2019/08/12
  • 15
    网站开发神奇【图片视频压缩网站】
    2019/07/24
  • 16
    windows安装sqlsrv遇到的问题
    2019/07/05
  • 17
    遇到服务空间有一部分没有使用的情况怎么扩展
    2019/06/20
  • 18
    阿里云磁盘空间扩容遇到的问题
    2019/06/20
  • 19
    关于用xhell连接linux虚拟机频繁出现Soc
    2019/06/15
  • 20
    nginx重置多级默认页
    2019/06/12
最喜标签
NYOJ 面试 AJAX ping CentOS 灰度算法 YII2