


    • 一、为什么用户层需要设计网络缓冲区
      • 1)设计读缓冲区的必要
      • 2)设计写缓冲区的必要
      • 3)普通buf+pos的问题
      • 4)ringbuffer(只解决了普通buf移动频繁的问题)
        • (1)头文件
        • (2).c文件
    • 二、ringbuffer以及多线程换将下数据安全思考
      • 1)1读1写->内存屏障
      • 2) 多线程(多读多写->需要加锁且加内存屏障)
      • 3)不需要加内存屏障的情况(加锁加了pthread_spinlock,自旋锁就不加)
    • 三、libevent中数据缓冲Bufferevent的设计
      • 1)背景介绍(bufferevent\evbuffer\evbuffer_chain)
      • 2)解释
      • 3)源码展示

  • 使用场景举例
    kFIFO、 无锁队列、网络缓冲区设计









#ifndef _RINGBUFFER_H__
#define _RINGBUFFER_H__

#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <stdio.h>

#define min(lth , rth) ((lth) < (rth) ? (lth): (rth))

typedef struct
    uint8_t* buf;       //buffer
    uint32_t size;      //大小
    uint32_t read_pos;  //读的位置,也是循环的缓冲区,从0到2^32-1
    uint32_t write_pos; //写的位置

void rb_init(ringbuffer_t* t ,uint32_t sz);
int32_t rb_is_empty(ringbuffer_t* t);
int32_t  rb_is_full(ringbuffer_t* t);
void rb_free(ringbuffer_t* t);
uint32_t rb_length(ringbuffer_t* t);
uint32_t rb_reamin(ringbuffer_t* t);

uint32_t rb_write(ringbuffer_t* dst_t,void* src_buf,uint32_t src_sz);
uint32_t rb_read(ringbuffer_t* src_t,void* dst_buf,uint32_t dst_sz);

static inline uint32_t roundup_power_of_two(uint32_t sz)
    if(sz < 2) return sz = 2;

    //5 -> 8   101 -> 1000 
    int i = 0;
    for(i = 0;sz != 0;++i)
    return 1 << i;

static inline int32_t  is_power_of_two(uint32_t sz)
    if(sz < 2) return -1;

    //m%n = m & (n-1)
    //m%n = m - n * floor(m/n)
    return (sz & (sz - 1)) == 0;

#include "ringbuffer.h"

int32_t rb_is_empty(ringbuffer_t* t)
    return t->read_pos == t->write_pos;

int32_t  rb_is_full(ringbuffer_t* t)
    //return (t->read_pos+t->size)%t->size == t->write_pos;

    //1)缓冲区的大小必须是2的n次幂 (UInt3_t大小为2^32-1,范围是0到2^32 -1)

    return t->size == t->write_pos - t->read_pos;

void rb_init(ringbuffer_t* t ,uint32_t sz)
    t->buf = (uint8_t*)malloc(sz*sizeof(uint8_t*));
    t->size = sz;
    t->read_pos = t->write_pos = UINT32_MAX -2;

void rb_free(ringbuffer_t* t)
        t->buf = NULL;

    t->read_pos = t->write_pos = t->size = 0;


uint32_t rb_length(ringbuffer_t* t)
    return t->write_pos - t->read_pos;

uint32_t rb_reamin(ringbuffer_t* t)
    return t->size - rb_length(t);

uint32_t rb_write(ringbuffer_t* dst_t,void* src_buf,uint32_t src_sz)
    if(src_sz > rb_reamin(dst_t))
        return -1;
    //dst_t->write_pos & (dst_t->size - 1)  把写坐标弄到这个圆环的范围之内
    uint32_t i = min(src_sz,dst_t->size - dst_t->write_pos & (dst_t->size - 1));

    memcpy(dst_t->buf + (dst_t->write_pos & (dst_t->size - 1)), src_buf,i);
    memcpy(dst_t->buf,src_buf + i,src_sz - i);//这里src_sz - i是负数是不会拷贝的

    dst_t->write_pos += src_sz;
    return src_sz;

uint32_t rb_read(ringbuffer_t* src_t,void* dst_buf,uint32_t dst_sz)
        return -1;
    uint32_t i = 0;
    dst_sz = min(dst_sz,rb_length(src_t) ); 

    i = min(dst_sz,(src_t->size - src_t->read_pos & (src_t->size - 1)));
    //src_t->read_pos & (src_t->size - 1))表示read_pos在size的位置
    //src_t->size - src_t->read_pos & (src_t->size - 1)表示求出右边空余位置的大小

    memcpy(dst_buf,src_t->buf + (src_t->read_pos & (src_t->size - 1)),i);
    memcpy(dst_buf + i,src_t->buf,dst_sz - i);

    src_t->read_pos +=dst_sz;

    return dst_sz;


int main()
    ringbuffer_t rb;

    uint32_t put1 = rb_write(&rb,(void*)("mark"),4);
    printf("put1 = %u,wr = %u,rd = %u,length = %u\n",put1,rb.write_pos,rb.read_pos,rb_length(&rb));

    return 0;



uint32_t rb_write(ringbuffer_t* dst_t,void* src_buf,uint32_t src_sz)
    if(src_sz > rb_reamin(dst_t))
        return -1;
    smp_mb();   //atomic_thread_fence()

    //dst_t->write_pos & (dst_t->size - 1)  把写坐标弄到这个圆环的范围之内
    uint32_t i = min(src_sz,dst_t->size - dst_t->write_pos & (dst_t->size - 1));

    memcpy(dst_t->buf + (dst_t->write_pos & (dst_t->size - 1)), src_buf,i);
    memcpy(dst_t->buf,src_buf + i,src_sz - i);//这里src_sz - i是负数是不会拷贝的

    smp_wmb();//放在dst_t->write_pos += src_sz;之前,确保memcpy正确拷贝

    dst_t->write_pos += src_sz;

    return src_sz;

uint32_t rb_read(ringbuffer_t* src_t,void* dst_buf,uint32_t dst_sz)
        return -1;

    smp_mb();   //atomic_thread_fence()

    uint32_t i = 0;
    dst_sz = min(dst_sz,rb_length(src_t) ); 

    i = min(dst_sz,(src_t->size - src_t->read_pos & (src_t->size - 1)));
    //src_t->read_pos & (src_t->size - 1))表示read_pos在size的位置
    //src_t->size - src_t->read_pos & (src_t->size - 1)表示求出右边空余位置的大小

    memcpy(dst_buf,src_t->buf + (src_t->read_pos & (src_t->size - 1)),i);
    memcpy(dst_buf + i,src_t->buf,dst_sz - i);

    smp_wmb();//放在dst_t->write_pos += src_sz;之前,确保memcpy正确拷贝

    src_t->read_pos +=dst_sz;

    return dst_sz;


2) 多线程(多读多写->需要加锁且加内存屏障)




  • 基础概念
    每个 bufferevent 都有一个输入缓冲区和一个输出缓冲区 ,它们的类型都是“struct evbuffer”。 有数据要写入到 bufferevent 时,添加数据到输出缓冲区 ;bufferevent 中有数据供读取的时候,从输入缓冲区抽取(drain)数据。evbuffer 接口支持很多种操作,后面的章节将讨论这些操作。


结构体bufferevent下面的evbuffer是个链表结构体,这个链表的每个节点都是挂在不通缓冲区里面,链表的每个节点是struct evbuffer_chain,每个节点的unsigned char *buffer;指向分配的空间


  • bufferevent结构体
struct bufferevent {
	/** Event base for which this bufferevent was created. */
	struct event_base *ev_base;
	/** Pointer to a table of function pointers to set up how this
	    bufferevent behaves. */
	const struct bufferevent_ops *be_ops;

	/** A read event that triggers when a timeout has happened or a socket
	    is ready to read data.  Only used by some subtypes of
	    bufferevent. */
	struct event ev_read;
	/** A write event that triggers when a timeout has happened or a socket
	    is ready to write data.  Only used by some subtypes of
	    bufferevent. */
	struct event ev_write;

	/** An input buffer. Only the bufferevent is allowed to add data to
	    this buffer, though the user is allowed to drain it. */
	struct evbuffer *input;

	/** An input buffer. Only the bufferevent is allowed to drain data
	    from this buffer, though the user is allowed to add it. */
	struct evbuffer *output;

	struct event_watermark wm_read;
	struct event_watermark wm_write;

	bufferevent_data_cb readcb;
	bufferevent_data_cb writecb;
	/* This should be called 'eventcb', but renaming it would break
	 * backward compatibility */
	bufferevent_event_cb errorcb;
	void *cbarg;

	struct timeval timeout_read;
	struct timeval timeout_write;

	/** Events that are currently enabled: currently EV_READ and EV_WRITE
	    are supported. */
	short enabled;
  • evbuffer结构体
struct evbuffer {
	/** The first chain in this buffer's linked list of chains. */
	struct evbuffer_chain *first;
	/** The last chain in this buffer's linked list of chains. */
	struct evbuffer_chain *last;

	/** Pointer to the next pointer pointing at the 'last_with_data' chain.
	 * To unpack:
	 * The last_with_data chain is the last chain that has any data in it.
	 * If all chains in the buffer are empty, it is the first chain.
	 * If the buffer has no chains, it is NULL.
	 * The last_with_datap pointer points at _whatever 'next' pointer_
	 * pointing at the last_with_data chain. If the last_with_data chain
	 * is the first chain, or it is NULL, then the last_with_datap pointer
	 * is &buf->first.
	struct evbuffer_chain **last_with_datap;

	/** Total amount of bytes stored in all chains.*/
	size_t total_len;

	/** Number of bytes we have added to the buffer since we last tried to
	 * invoke callbacks. */
	size_t n_add_for_cb;
	/** Number of bytes we have removed from the buffer since we last
	 * tried to invoke callbacks. */
	size_t n_del_for_cb;

	/** A lock used to mediate access to this buffer. */
	void *lock;
	/** True iff we should free the lock field when we free this
	 * evbuffer. */
	unsigned own_lock : 1;
	/** True iff we should not allow changes to the front of the buffer
	 * (drains or prepends). */
	unsigned freeze_start : 1;
	/** True iff we should not allow changes to the end of the buffer
	 * (appends) */
	unsigned freeze_end : 1;
	/** True iff this evbuffer's callbacks are not invoked immediately
	 * upon a change in the buffer, but instead are deferred to be invoked
	 * from the event_base's loop.	Useful for preventing enormous stack
	 * overflows when we have mutually recursive callbacks, and for
	 * serializing callbacks in a single thread. */
	unsigned deferred_cbs : 1;
#ifdef _WIN32
	/** True iff this buffer is set up for overlapped IO. */
	unsigned is_overlapped : 1;
	/** Zero or more EVBUFFER_FLAG_* bits */
	ev_uint32_t flags;

	/** Used to implement deferred callbacks. */
	struct event_base *cb_queue;

	/** A reference count on this evbuffer.	 When the reference count
	 * reaches 0, the buffer is destroyed.	Manipulated with
	 * evbuffer_incref and evbuffer_decref_and_unlock and
	 * evbuffer_free. */
	int refcnt;

	/** A struct event_callback handle to make all of this buffer's callbacks
	 * invoked from the event loop. */
	struct event_callback deferred;

	/** A doubly-linked-list of callback functions */
	LIST_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;

	/** The parent bufferevent object this evbuffer belongs to.
	 * NULL if the evbuffer stands alone. */
	struct bufferevent *parent;
  • 结构体evbuffer_chain
struct evbuffer_chain {
	/** points to next buffer in the chain */
	struct evbuffer_chain *next;

	/** total allocation available in the buffer field. */
	size_t buffer_len;

	/** unused space at the beginning of buffer or an offset into a
	 * file for sendfile buffers. */
	ev_misalign_t misalign;

	/** Offset into buffer + misalign at which to start writing.
	 * In other words, the total number of bytes actually stored
	 * in buffer. */
	size_t off;

	/** Set if special handling is required for this chain */
	unsigned flags;
#define EVBUFFER_FILESEGMENT	0x0001  /**< A chain used for a file segment */
#define EVBUFFER_SENDFILE	0x0002	/**< a chain used with sendfile */
#define EVBUFFER_REFERENCE	0x0004	/**< a chain with a mem reference */
#define EVBUFFER_IMMUTABLE	0x0008	/**< read-only chain */
	/** a chain that mustn't be reallocated or freed, or have its contents
	 * memmoved, until the chain is un-pinned. */
#define EVBUFFER_MEM_PINNED_R	0x0010
#define EVBUFFER_MEM_PINNED_W	0x0020
	/** a chain that should be freed, but can't be freed until it is
	 * un-pinned. */
#define EVBUFFER_DANGLING	0x0040
	/** a chain that is a referenced copy of another chain */

	/** number of references to this chain */
	int refcnt;

	/** Usually points to the read-write memory belonging to this
	 * buffer allocated as part of the evbuffer_chain allocation.
	 * For mmap, this can be a read-only buffer and
	 * EVBUFFER_IMMUTABLE will be set in flags.  For sendfile, it
	 * may point to NULL.
	unsigned char *buffer;

本文标签: 缓冲区队列服务器正文libevent