博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
BufferEvent
阅读量:4036 次
发布时间:2019-05-24

本文共 15192 字,大约阅读时间需要 50 分钟。

bufferevent

bufferevent介绍

  由于我们在网络通信中有很多读写的场景,所以Libevent也提供了相应的bufferevent。这些bufferevent由一个底层的传输对象(socket),一个ReadBuufer,一个WriteBuffer构成。与之前普通的event不同的是bufferevent只有从它的传输对象(fd)读取到了足够多的数据后才会调用它的回调函数。

  bufferevent有四种类型,这四种类型共享一个接口:

  1. 基于socket的bufferevent,这些socket是以流形式的套接字,使用 event_* 的接口,所以它目前不支持udp
  2. 异步I/O的bufferevent ,这个只针对于IOCP
  3. filtering bufferevents (过滤型),这个类型的bufferevent在把数据传输给底层的bufferevent对象前都会对数据进行输入或者输出的处理,比如我们经常的解压或者序列化/反序列化
  4. paired bufferevent ,这种类型的bufferevent可以互相通信,这个像开了一个消息队列或者管道

  当然Libevent并没有实现出这些公共的接口支持所有类型的buffervevent。这些bufferevent的细节可以从event2/bufferevent.h中看到,evbuffer可以从event2/buffer.h看到。

不适用场景

不适用场景一 – 每次处理的数据量过大时

  每次bufferevent都会从tcp的接受缓冲区帮我们最多读取4096个字节,那么其实这样就很显得鸡肋,如果我们去处理大量数据的时候,它最多帮我们读出4096个字节,所以可能它帮我们读出的内容可能不够我们去进行一次数据的处理操作,所以我们只能自己去使用它的Evbuffer缓冲区去读更多的数据去处理业务。

不适用场景二 – 多路复用

  在多路复用的场景下,可能我们每次只从缓冲区处理一小部分数据,然后结束当前处理操作,把剩下的数据交给下一个处理模块处理,这样就可以并发的处理更多的fd(socket)。nginx就是这样的思路,把一个HTTP请求的处理分成多个模块,这样就可以使用多路复用同时处理多个连接的请求。但是在bufferevent中,它直接帮我们读上来了4096个字节,可能把下一个模块的内容也读到当前模块,不利于去模块化划分,所以也不适合多路复用。

鸡肋的超时

  bufferevent可以给相应的read / write 操作设置超时,如果超时触发后没有写或者读就绪就会进入异常逻辑,转而去调用 event 回调函数,这个其实十分不合理,因为我们设置了一个超时给一个readcb 或者 writecb ,超时触发后却调用了eventcb函数。没什么用,只有read和timeout 同时触发时,才会进入正常逻辑,那有read触发时我们要 timeout有毛用?

何为bufferevent

  这个bufferevent就像我们c语言中的标准输入流和标准输出流。在c 语言中FILE流对象,它也是封装了一个文件描述符和一个输入缓冲区和输出缓冲区。那么同理,在bufferevent中当有数据到达的时候,它代替了我们读取数据就像stdin一样,我们并没有调用read系统调用只是调用了scanf数据就会被我们读上来。相同的是在bufferevent中,我们把数据写到output缓冲区后,不同担心何时发送它会自动帮我们发。

  从buffer的设计来看,buffer就是减少我们系统调用I/O函数的次数,将多次数据一次性刷出提高I/O效率,这点在bufferevent中也是有体现到的。

如何自动读取

  由bufferevent内部封装的rdevent去等待读就绪,读就绪后调用evbuffer_read把fd的数据读到Input中,然后判断如果高于read低水位线调用我们自己设置的readcb。每次它最多帮我们读取 4096(EVBUFFER_MAX_READ)个字节。

这里写图片描述

evbuffer 和 bufferevent的关系

  bufferevent中的输入/输出缓冲区类型就是 struct evbuffer。当我们有数据要读,可以把它读到bufferevent的输入缓冲区中,有数据要写同理。

bufferevent的回调函数与水位线

  每一个bufferevent都有一个读callback 和 一个写callback函数。默认,任何读自于bufferevent的底层传输对象的数据都会导致读callback被调用,并且每当有足够的数据从写缓冲区刷新到底层传输对象时写callback被调用,你可以通过调整下面的水位线来达到重写这些回调函数的表现。

1. Read low-water mark : 读取操作使得输入缓冲区的数据量在此级别或者更高时 ,读取回调将被调用。默认值为 0,所以每个读取操作都会导致读取回调被调用。
2. Read high-water mark:如果 bufferevent的输入缓冲区达到这个级别后,bufferevent就停止读取直到有足够的数据被消费使我们低于这个线,才能恢复读取。默认是0,所以我们不会因为输入缓冲区的大小就停止读取数据
3. Write low-water mark:每当一个write (发送)操作导致写缓冲区中的数据小于等于这个线,写callback就会被调用。默认是0,所以基本上不会调用写callback 函数,除非write操作把写缓冲区的数据搬空了,这个时候write callback 函数才会回调。
4. Write high-water mark:没啥用

bufferevent的回调函数中的short类型的参数标志
BEV_EVENT_READING

一个bufferevent发送了读操作,用来查看哪一个event发送了读操作

BEV_EVENT_WRITING

一个bufferevent发送了写操作,用来查看哪一个event发送了写操作

BEV_EVENT_ERROR

当发送了一个error的时候这个标志就会被标记起来,可以通过EVUTIL_SOCKET_ERROR函数来查看

BEV_EVENT_TIMEOUT

当bufferevent发送了超时

BEV_EVENT_EOF

得到了一个EOF

BEV_EVENT_CONNECTED

该bufferevent的socket成为ESTABLELISHED状态

创建bufferevent的标志

  你可以使用以下一个或者多个标记去创建一个bufferevent,以下这些值都是int类型

  1. BEV_OPT_CLOSE_ON_FREE :当free的时候,底层传输的对象会被close
  2. BEV_OPT_THREADSAFE : 自动的给这个bufferevent申请锁,所以设置后bufferevent是线程安全的
  3. BEV_OPT_DEFER_CALLBACKS:延迟callback
  4. BEV_OPT_UNLOCK_CALLBACKS: 默认bufferevent是线程安全的,设置这个选项后不在线程安全
延迟callback

  因为函数的调用都是在栈上,那么bufferEvent就有可能,当输入回调和输出回调之间关系特别复杂时就有有可能在连续的互相调用下发生了栈溢出,所以我们可以设置延迟bufferevent的回调函数。这些被延迟的回调函数会被加入event_loop的队列中,当所有的常规回调调用完毕后,就会调用它们

基于socket的bufferevent(创建一个基于socket的bufferevent)
v2.0.1 struct bufferevent * bufferevent_socket_new(struct event_base * base,evutil_socket_t fd,int bufferevent_options options);

  这个FD是一个可选的套接字文件描述符,而且这个套接字必须是非阻塞模式的套接字,如果这个参数我们设置为-1,表示稍后去设置这个文件描述符,options就是上面介绍的选项。

#include 
#include
#include
#include
void eventcb(struct bufferevent *bev, short events, void *ptr){ if (events & BEV_EVENT_CONNECTED) { /* We’re connected to 127.0.0.1:8080. Ordinarily we’d do something here, like start reading or writing. */ } else if (events & BEV_EVENT_ERROR) { /* An error occured while connecting. */ }}int main_loop(void){ struct event_base *base; struct bufferevent *bev; struct sockaddr_in sin; base = event_base_new(); memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ sin.sin_port = htons(8080); /* Port 8080 */ bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, NULL, NULL, eventcb, NULL); if (bufferevent_socket_connect(bev, (struct sockaddr *)&sin, sizeof(sin)< 0) { /* Error starting connection */ bufferevent_free(bev); return -1; } event_base_dispatch(base); return 0; }

  bufferevent_socket_connect 函数只在 v2.0.2以后有,之前只能手动connect ,如果这个连接成功,bufferevent会收到一个写就绪。如果通过bufferevent_socket_connect发起连接就会收到BEV_EVENT_CONNECTED标志。

使用自己connect的socket并给其绑定的bufferevent加上CONNECTED标志

  如果你自己想调用connect函数还想收到BEV_EVENT_CONNECTED标志,你可以先调用connect(成功调用才行),然后调用bufferevent_socket_connect(bev, NULL, 0)。

以域名的方式发起一个连接

  经常,我们都想把域名解析和连接合并成一个单独的操作

int bufferevent_socket_connect_hostname(struct bufferevent *bev,    struct evdns_base *dns_base, int family, const char *hostname,int port);int bufferevent_socket_get_dns_error(struct bufferevent *bev);

  family指的是地址协议族,目前只支持AF_INET,AF_INET6,AF_UNSPEC。如果这个域名解析失败,它会调用相应 bufferevent的回调函数,如果解析成功以bufferevent_connect的方式发起连接。如果失败的话,我们可以通过bufferevent_socket_get_dns_error来获得错误码。

  如果dns_base参数为空,那么Libevent会阻塞式解析,这个肯定不是我们想要的,如果我们提供参数那么Libevent就会使用它异步的去解析。

/* Don’t actually copy this code: it is a poor way to implement anHTTP client. Have a look at evhttp instead.*/#include 
#include
#include
#include
#include
#include
void readcb(struct bufferevent *bev, void *ptr){ char buf[1024]; int n; struct evbuffer *input = bufferevent_get_input(bev); while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) { fwrite(buf, 1, n, stdout); }}void eventcb(struct bufferevent *bev, short events, void *ptr){ if (events & BEV_EVENT_CONNECTED) { printf("Connect okay.\n"); } else if (events & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) { struct event_base *base = ptr; if (events & BEV_EVENT_ERROR) { int err = bufferevent_socket_get_dns_error(bev); if (err) printf("DNS error: %s\n", evutil_gai_strerror(err)); } printf("Closing\n"); bufferevent_free(bev); event_base_loopexit(base, NULL); }}int main(int argc, char **argv){ struct event_base *base; struct evdns_base *dns_base; struct bufferevent *bev; if (argc != 3) { printf("Trivial HTTP 0.x client\n" "Syntax: %s [hostname] [resource]\n" "Example: %s www.google.com /\n",argv[0],argv[0]); return 1; } base = event_base_new(); dns_base = evdns_base_new(base, 1); bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, readcb, NULL, eventcb, base); bufferevent_enable(bev, EV_READ|EV_WRITE); evbuffer_add_printf(bufferevent_get_output(bev), "GET %s\r\n", argv[2]); bufferevent_socket_connect_hostname(bev, dns_base, AF_UNSPEC, argv[1], 80); event_base_dispatch(base); return 0;}

  bufferEvent是基于引用计数的,所以当有未处理的延迟callback函数时,bufferevent不会立即释放,直到这个回调函数调用完毕。如果我们设置了BEV_OPT_CLOSE_ON_FREE 标志,在free的同时也会close 掉 bufferEvent对应的fd。

设置bufferevent的callback、watermark、enabled operations
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);typedef void (*bufferevent_event_cb)(struct bufferevent *bev,short events, void *ctx);void bufferevent_setcb(struct bufferevent *bufev,bufferevent_data_cb readcb, bufferevent_data_cb writecb,bufferevent_event_cb eventcb, void *cbarg); 1.4.4void bufferevent_getcb(struct bufferevent *bufev,bufferevent_data_cb *readcb_ptr,bufferevent_data_cb *writecb_ptr,bufferevent_event_cb *eventcb_ptr,void **cbarg_ptr);2.1.1

  bufferevent_setcb函数是用来设置buffferevent的回调函数的一个函数。其中readcb、writecb、event是读就绪,写就绪,event就绪(flag标志条件就绪,flag标志是上面提到的error\timeout\eof\connect事件) 的回调函数。其中cbarg参数被那三个函数共享,所以一旦改变三个函数都收影响,event_cb的short参数是bufferevent的flag标志的集合。你可以通过传递NULL来表示你关闭了相应的回调函数,以此来表示你不关心相应的事件。

  其实我们给bufferevent设置的回调函数并不是注册到了其event上,bufferevent其实底层封装了一层event,它有自己的callback函数。我们设置的callback函数只是根据底层event的相应反映而调用的。
  注意当有错误发生了,就触发event回调函数,这个时候如果因为read操作触发的错误,那么read功能就会被关闭,因为write触发的错误write功能就会被关闭

bufferevent的读写功能的开启与关闭

  我们可以通过EV_READ,EV_WRITE或EV_READ|EV_WRITE去开启或者关闭一个bufferevent的读写功能。当读或者写功能关闭了,这个bufferevent将不会再尝试去读或者写在fd上。

  当bufferevent的输出缓冲区为空的时候,我们不需要主动的去关闭写功能,这个时候它自动关闭写功能并且当有数据到来时它会自动重启,但是当写缓冲区数据低于write-low-water的时候write-callback函数会被调用,这个时候写缓冲区数据就会被填充然后再度开启写功能。写入同理当输入缓冲区的数据超过read-high-water的时候我们也不用主动关闭读功能,因为bufferevent会自动关闭读功能,当有更多的空间的时候再开启读功能。默认的一个新创建的bufferevent是只有写功能没有读功能的,你可以主动调用打开读功能

开启或关闭读写功能

void bufferevent_enable(struct bufferevent * bufev,short events); void bufferevent_disable(struct bufferevent * bufev,short events); short bufferevent_get_enabled(struct bufferevent * bufev); //查看bev是否有开启某些功能

  开启或关闭一个选项相当于在bufev对应的event_base中注册或者删除一个相应的event事件,如果EV_READ|EV_WRITE都关闭了且该event_base只注册了bufferevent,那么循环会终止已经没有注册的事件。

  当我们enable一个选项的时候,这个EV_READ 或者 EV_WRITE 是以持久化的形式(EV_PERSIST)的形式注册到了其相应的event_base中。

设置水位线

  水位线的设置根据标志所决定如下所示

  1. EV_READ 表示设置读水位线
  2. EV_WRITE 表示设置写水位线
  3. EV_READ | EV_WRTE 表示设置读和写的水位线

  如果我们设置highmark为0就是表示不限制高水位线

void bufferevent_setwatermark(struct bufferevent * bufev,short events,size_t lowmark,size_t highmark); 1.4.4
设置水位线demo
#include 
#include
#include
#include
#include
#include
#include
struct info { const char *name; size_t total_drained;};void read_callback(struct bufferevent *bev, void *ctx){ struct info *inf = ctx; struct evbuffer *input = bufferevent_get_input(bev); size_t len = evbuffer_get_length(input); if (len) { inf->total_drained += len; evbuffer_drain(input, len); printf("Drained %lu bytes from %s\n", (unsigned long) len, inf->name); }}void event_callback(struct bufferevent *bev, short events, void *ctx){ struct info *inf = ctx; struct evbuffer *input = bufferevent_get_input(bev); int finished = 0; if (events & BEV_EVENT_EOF) { size_t len = evbuffer_get_length(input); printf("Got a close from %s. We drained %lu bytes from it, " "and have %lu left.\n", inf->name, (unsigned long)inf->total_drained, (unsigned long)len); finished = 1; } if (events & BEV_EVENT_ERROR) { printf("Got an error from %s: %s\n", inf->name, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); finished = 1; } if (finished) { free(ctx); bufferevent_free(bev); }}struct bufferevent *setup_bufferevent(void){ struct bufferevent *b1 = NULL; struct info *info1; info1 = malloc(sizeof(struct info)); info1->name = "buffer 1"; info1->total_drained = 0; /* ... Here we should set up the bufferevent and make sure it gets connected... */ /* Trigger the read callback only whenever there is at least 128 bytes of data in the buffer. */ bufferevent_setwatermark(b1, EV_READ, 128, 0); bufferevent_setcb(b1, read_callback, NULL, event_callback, info1); bufferevent_enable(b1, EV_READ); /* Start reading. */ return b1;}

如何使用bufferevent去读写网络数据

  上层应用都是在输入缓冲区中读取数据,在输出缓冲区中写入数据,所以输入缓冲区数据应该越来越少,输出缓冲区数据应该越来越多。当输出缓冲区为空的时候,bufferevent自动停止写入功能,输入缓冲区数据大于高水位线的时候自动停止读功能,当数据量恢复正常的时候,它们也恢复正常,这些就是读写的一些机制。下面介绍返回俩个缓冲区的接口

struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);

  struct evbuffer 就是具体的缓冲区的类型。那么我们可以通过这个结构体来跟bufferevent来进行数据交互,下面是数据交互的接口

int bufferevent_write(struct bufferevent *Dstbufev,const void *Srcdata, size_t size);int bufferevent_write_buffer(struct bufferevent *Dstbufev,struct evbuffer *Srcbuf);size_t bufferevent_read(struct bufferevent *Srcbufev, void *DstData, size_t size);int bufferevent_read_buffer(struct bufferevent *SrcBufev,struct evbuffer *DstBuf);

  write类的函数都是把用户缓冲区的数据添加到OutPut 的缓冲区中的末尾。read类的函数都是把相应bufferevent中的数据移动到用户的缓冲区中,如果InPut缓冲区数据为0则函数返回0。

write操作的注意点

  如果我们调用bufferevent_write函数的时候往写缓冲区中写数据了,但是写的数据过少的时候,write 操作就会把写缓冲区内的所有数据都发送出去,这个时候写缓冲区空了就会触发写缓冲区的回调函数补充数据,write操作又因为写缓冲区内又数据再次搬移,这个时候就会造成死循环

读写交互demo
#include 
#include
#include
voidread_callback_uppercase(struct bufferevent *bev, void *ctx){/* This callback removes the data from bev’s input buffer 128 bytes at a time, uppercases it, and starts sending itback.(Watch out! In practice, you shouldn’t use toupper to implementa network protocol, unless you know for a fact that the currentlocale is the one you want to be using.)*/char tmp[128];size_t n;int i;while (1) {n = bufferevent_read(bev, tmp, sizeof(tmp));if (n <= 0)break; /* No more data. */for (i=0; i
other_bev));}struct count {unsigned long last_fib[2];};voidwrite_callback_fibonacci(struct bufferevent *bev, void *ctx){ /* Here’s a callback that adds some Fibonacci numbers to the output buffer of bev. It stops once we have added 1k of data; once this data is drained, we’ll add more. */ struct count *c = ctx; struct evbuffer *tmp = evbuffer_new(); while (evbuffer_get_length(tmp) < 1024) { unsigned long next = c->last_fib[0] + c->last_fib[1]; c->last_fib[0] = c->last_fib[1]; c->last_fib[1] = next; evbuffer_add_printf(tmp, "%lu", next); } /* Now we add the whole contents of tmp to bev. */ bufferevent_write_buffer(bev, tmp); /* We don’t need tmp any longer. */ evbuffer_free(tmp);}
bufferevent 的超时机制

我们可以设置一个timeout的时机,如果这个时间都超时了,bufferevent都没有进行相应的read/write操作,那么就会触发这个bufferevent的callback函数,并且会设置下面的标志

BEV_EVENT_TIMEOUT|BEV_EVENT_READING或者BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING

  timeout机制生效的前提下是bufferevent没有关闭相应的read/write的功能。如果bufferevent关闭了read功能或者 Input缓冲区超过了high-water的时候,read-timeout失效。如果bufferevent关闭了write功能或者Output缓冲区空了的时候,write-timeout失效。失效后会触发我们设置的event回调函数,如果此时event_base 没有进程EV_WRITE操作或者其他的event事件则循环退出。

  下面是设置超时的接口函数:

void bufferevent_set_timeouts(struct bufferevent *bufev,const struct timeval *timeout_read, const struct timeval *timeout_write);v2.0.4 才在各个bufferevent类型中表现一致

  注意当给EV_READ添加超时事件,当超时触发后底层就进行read操作,这个时候如果socket没有数据就会阻塞住,所以得设置该socket为非阻塞socket,如果非阻塞socket那么该bufferevent会再次被调度

  其次这个超时真的很鸡肋,如果超时事件触发的同时EV_READ|EV_WRITE事件也触发了,那么该read / write 操作正常进行,如果没有只有timeout事件触发了当做异常处理,会调用event 回调函数,也就是说我们给 readcb 设置一个timeout,如果超时了却调用的是eventcb 这是十分不合理的。

刷新bufferevent

int bufferevent_flush(struct bufferevent *bufev,short iotype, enum bufferevent_flush_mode state);-1 : 失败0 : 无数据可刷1 : 成功刷了一些数据
  1. iotype:EV_READ, EV_WRITE, or EV_READ|EV_WRITE
  2. state : BEV_NORMAL, BEV_FLUSH, or BEV_FINISHED
      FIN就像 tcp的fin段一样,它给告诉对端Libevent别发送数据了。当我们使用刷新的时候,数据就会被强制的进行 read / write操作。一个很重要的是,这个函数不支持 socket类型的bufferevent。
特定类型的bufferevent的操纵函数

下面四个函数,前三个只针对 socket类型的bufferevent。这些函数可以设置优先级和设置fd。

int bufferevent_priority_set(struct bufferevent *bufev, int pri); int bufferevent_get_priority(struct bufferevent *bufev); v2.1.2 int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd);evutil_socket_t bufferevent_getfd(struct bufferevent *bufev); 2.0.2struct event_base *bufferevent_get_base(struct bufferevent *bev); 2.0.9struct bufferevent *bufferevent_get_underlying(struct bufferevent *bufev); 2.0.2
bufferevent 的lock函数

  由于输入输出缓冲区的存在,有的时候我们想操纵bufferevent的时候是具有原子性的,那么我们可以使用Libevent给我们提供的专门针对于 bufferevent的加锁函数

void bufferevent_lock(struct bufferevent * bufev); void bufferevent_unlock(struct bufferevent * bufev);

  第一个函数,如果我们已经给一个bufferevent加了锁再次调用也是安全的。第二个函数,我们只能解锁一次,否则出错。如果没有设置BEV_OPT_THREADSAFE选项,调这个函数没有什么影响

你可能感兴趣的文章
Linux usb 设备驱动 (1)
查看>>
解决跨网场景下,CAS重定向无法登录的问题(无需修改现有代码)
查看>>
java反编译命令
查看>>
activemq依赖包获取
查看>>
概念区别
查看>>
关于静态块、静态属性、构造块、构造方法的执行顺序
查看>>
final 的作用
查看>>
在Idea中使用Eclipse编译器
查看>>
idea讲web项目部署到tomcat,热部署
查看>>
IDEA Properties中文unicode转码问题
查看>>
Idea下安装Lombok插件
查看>>
zookeeper
查看>>
Idea导入的工程看不到src等代码
查看>>
技术栈
查看>>
Jenkins中shell-script执行报错sh: line 2: npm: command not found
查看>>
8.X版本的node打包时,gulp命令报错 require.extensions.hasownproperty
查看>>
Jenkins 启动命令
查看>>
Maven项目版本继承 – 我必须指定父版本?
查看>>
Maven跳过单元测试的两种方式
查看>>
通过C++反射实现C++与任意脚本(lua、js等)的交互(二)
查看>>