??斗地主捕鱼电竞提现秒到 广告位招租 - 15元/月全站展示
??支付宝搜索579087183领大额红包 ??伍彩集团官网直营彩票
??好待遇→招代理 ??伍彩集团官网直营彩票
??络茄网 广告位招租 - 15元/月全站展示
memcached源码阅读----使用libevent

转载   2014-09-15   浏览量:1072


本篇文章主要是我今天阅读memcached源码关于进程启动,在网络这块做了哪些事情。


一、iblievent的使用

首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型。因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调。

可以减掉了解一下 libevent基本api调用

struct event_base *base;
base = event_base_new();//初始化libevent

event_base_new对比epoll,可以理解为epoll里的epoll_create。

event_base内部有一个循环,循环阻塞在epoll调用上,当有一个事件发生的时候,才会去处理这个事件。其中,这个事件是被绑定在event_base上面的,每一个事件就会对应一个struct event,可以是监听的fd。

其中struct event 使用event_new 来创建和绑定,使用event_add来启用,例如:

struct event *listener_event;
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);


参数说明:

base:event_base类型,event_base_new的返回值

listener:监听的fd,listen的fd

EV_READ|EV_PERSIST:事件的类型及属性

do_accept:绑定的回调函数

(void*)base:给回调函数的参数

event_add(listener_event, NULL);

对比epoll:

event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。

event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加事件。

注:libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)
EV_TIMEOUT: 超时
EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发
EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
EV_SIGNAL: POSIX信号量
EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除
EV_ET: Edge-Trigger边缘触发,相当于EPOLL的ET模式

事件创建添加之后,就可以处理发生的事件了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直到不再有需要关注的事件。


有了上面的分析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:

1. 创建socket,bind,listen,设置为非阻塞模式

2. 创建一个event_base,即

[cpp] view plaincopy
  1. struct event_base * event_base_new(void)

    3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)。即

    [cpp] view plaincopy
    1. struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)

      4. 启用该事件,即

      [cpp] view plaincopy
      1. int event_add(struct event *ev, const struct timeval *tv)

        5. 进入事件循环,即

        [cpp] view plaincopy
        1. int event_base_dispatch(struct event_base *event_base)


          有了上边的基础东西,可以进入memcached的阅读了。
          二、memcached源码分析
          main函数启动,首先会初始化很多数据,这里我们只涉及大网络这块,其他以后分析,先忽略。 1.首先初始化 主工作线程的的iblievet对象
           彩票开奖查询 www.kbyp.net   /* initialize main thread libevent instance */
              main_base = event_init();

          最后会调用
             /* enter the event loop */
              if (event_base_loop(main_base, 0) != 0) {
                  retval = EXIT_FAILURE;
              }

          在该对象内部循环。不退出。
          2.初始化连接的对象
          static void conn_init(void) {
              freetotal = 200;
              freecurr = 0;
              if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
                  fprintf(stderr, "Failed to allocate connection structures\n");
              }
              return;
          }


          这里是先预先分配200个conn*的内存。等有连接上来,会从freeconns 取。 如下代码:
          /*
           * Returns a connection from the freelist, if any.
           */
          conn *conn_from_freelist() {
              conn *c;
          
              pthread_mutex_lock(&conn_lock);
              if (freecurr > 0) {
                  c = freeconns[--freecurr];
              } else {
                  c = NULL;
              }
              pthread_mutex_unlock(&conn_lock);
          
              return c;
          }


          3.那么conn的结构体内部长什么样子呢?
          typedef struct conn conn;
          struct conn {
              int    sfd;
              sasl_conn_t *sasl_conn;
              enum conn_states  state;
              enum bin_substates substate;
              struct event event;
              short  ev_flags;
              short  which;   /** which events were just triggered */
          
              char   *rbuf;   /** buffer to read commands into */
              char   *rcurr;  /** but if we parsed some already, this is where we stopped */
              int    rsize;   /** total allocated size of rbuf */
              int    rbytes;  /** how much data, starting from rcur, do we have unparsed */
          
              char   *wbuf;
              char   *wcurr;
              int    wsize;
              int    wbytes;
              /** which state to go into after finishing current write */
              enum conn_states  write_and_go;
              void   *write_and_free; /** free this memory after finishing writing */
          
              char   *ritem;  /** when we read in an item's value, it goes here */
              int    rlbytes;
          
              /* data for the nread state */
          
              /**
               * item is used to hold an item structure created after reading the command
               * line of set/add/replace commands, but before we finished reading the actual
               * data. The data is read into ITEM_data(item) to avoid extra copying.
               */
          
              void   *item;     /* for commands set/add/replace  */
          
              /* data for the swallow state */
              int    sbytes;    /* how many bytes to swallow */
          
              /* data for the mwrite state */
              struct iovec *iov;
              int    iovsize;   /* number of elements allocated in iov[] */
              int    iovused;   /* number of elements used in iov[] */
          
              struct msghdr *msglist;
              int    msgsize;   /* number of elements allocated in msglist[] */
              int    msgused;   /* number of elements used in msglist[] */
              int    msgcurr;   /* element in msglist[] being transmitted now */
              int    msgbytes;  /* number of bytes in current msg */
          
              item   **ilist;   /* list of items to write out */
              int    isize;
              item   **icurr;
              int    ileft;
          
              char   **suffixlist;
              int    suffixsize;
              char   **suffixcurr;
              int    suffixleft;
          
              enum protocol protocol;   /* which protocol this con
            if (sigignore(SIGPIPE) == -1) {
                  perror("failed to ignore SIGPIPE; sigaction");
                  exit(EX_OSERR);
              }

          nection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ struct sockaddr request_addr; /* Who sent the most recent request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer; size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */};
          
          
          这里的所有字段就是在处理数据需要用到的。这里不详细描述。以后会慢慢分解。

          因为是memcached是多线程模型,因此在从freeconn取出一个对象的时候,是要加解锁使用。
          忽略SIGIPIE信号,防止rst时的程序退出
            if (sigignore(SIGPIPE) == -1) {
                  perror("failed to ignore SIGPIPE; sigaction");
                  exit(EX_OSERR);
              }

          初始化多线程模型,并且每个线程一个iblievent的事件模型就是调用event_init函数。
          /* start up worker threads if MT mode */
              thread_init(settings.num_threads, main_base);

          内部实现不详细。主要是调用pthread_create函数。
          4、然后开始通过端口号启动网络监听事件
          代码如下:
             if (settings.port && server_sockets(settings.port, tcp_transport,
                                                     portnumber_file)) {
                      vperror("failed to listen on TCP port %d", settings.port);
                      exit(EX_OSERR);
                  }

          然后调用下面的函数:
          static int server_socket(const char *interface,
                                   int port,
                                   enum network_transport transport,
                                   FILE *portnumber_file)

          因为,一个主机可能会有多个网卡,比如双线机房,联通或者电信,因此内部实现会出现以下代码:

           for (next= ai; next; next= next->ai_next) {
                  conn *listen_conn_add;
                  if ((sfd = new_socket(next)) == -1) {
                      /* getaddrinfo can return "junk" addresses,
                       * we make sure at least one works before erroring.
                       */
                      if (errno == EMFILE) {
                          /* ...unless we're out of fds */
                          perror("server_socket");
                          exit(EX_OSERR);
                      }
                      continue;
                  }
          

          static int new_socket(struct addrinfo *ai)

          该函数就是调用socket函数,设置为非阻塞。

          5、然后生成一个监听的conn对象
          代码如下
           if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                                       EV_READ | EV_PERSIST, 1,
                                                       transport, main_base))) {
                          fprintf(stderr, "failed to create listening connection\n");
                          exit(EXIT_FAILURE);
                      }
                      listen_conn_add->next = listen_conn;
                      listen_conn = listen_conn_add;

          static conn *listen_conn = NULL;
          作为全局的静态的变量。无头结点的单链表
          我们继续深入conn_new 函数内部
          conn *conn_new(const int sfd, enum conn_states init_state,
                          const int event_flags,
                          const int read_buffer_size, enum network_transport transport,
                          struct event_base *base) {
              conn *c = conn_from_freelist();


          该函数主要是做了哪些动作呢?
          第一,从刚才的free_cnn_list取出一个conn* 来,然互分配内存,根据相关配置信息,进行相关的字段初始化工作。

          第二,加入到iblievent事件库中
          event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
              event_base_set(base, &c->event);
              c->ev_flags = event_flags;
          
              if (event_add(&c->event, 0) == -1) {
                  if (conn_add_to_freelist(c)) {
                      conn_free(c);
                  }
                  perror("event_add");
                  return NULL;
              }

          这一步就是,讲sfd上的事件绑定event_handler 函数,就是当有该连接上来的时候有数据进行可读的时候绑定,回调。
          7、状态机的解读
          最终event_handler函数会调用
          static void drive_machine(conn *c)
          函数。那么这个函数做了哪些工作呢?
          当然是等待连接了,那就是accept函数了。 因此,入股市conn_listening状态,
            while (!stop) {
          
                  switch(c->state) {
                  case conn_listening:
                      addrlen = sizeof(addr);
                      if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1)


          当然同样是 讲sfd设置成非阻塞的。
          这个时候是有数据上来了。
          因此就要设置读命令状态了,调用以下函数:
          /*
           * Dispatches a new connection to another thread. This is only ever called
           * from the main thread, either during initialization (for UDP) or because
           * of an incoming connection.
           */
          void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                                 int read_buffer_size, enum network_transport transport) {
              CQ_ITEM *item = cqi_new();
              char buf[1];
              int tid = (last_thread + 1) % settings.num_threads;
          
              LIBEVENT_THREAD *thread = threads + tid;
          
              last_thread = tid;
          
              item->sfd = sfd;
              item->init_state = init_state;
              item->event_flags = event_flags;
              item->read_buffer_size = read_buffer_size;
              item->transport = transport;
          
              cq_push(thread->new_conn_queue, item);
          
              MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
              buf[0] = 'c';
              if (write(thread->notify_send_fd, buf, 1) != 1) {
                  perror("Writing to thread notify pipe");
              }
          }


          
          
          
          通过注释可以知道,该函数是讲一个新连接分配各其他线程,
          通过代码我们可以看出 首先,分配一个item块,讲连接的socket的fd 赋值给item,同时有当前状态,标志位,读buff大小等,然后分配一个线程,讲item推送到该thread的处理队列里了。
          然互,通过往管道里写入C字符,通知到管道的另一端,进行处理该操作符的事件。因此,完成了对对该连接的 分配工作。
          那么我接下来看一看 线程是如果处理的。
          在初始化线程的时候,已经把管道的两个操作符放入到了iblievent里了。如下代码:
            /* Listen for notifications from other threads */
              event_set(&me->notify_event, me->notify_receive_fd,
                        EV_READ | EV_PERSIST, thread_libevent_process, me);
              event_base_set(me->base, &me->notify_event);
          
              if (event_add(&me->notify_event, 0) == -1) {
                  fprintf(stderr, "Can't monitor libevent notify pipe\n");
                  exit(1);
              }



          绑定了回调函数:
          static void thread_libevent_process(int fd, short which, void *arg) 


          当读到字符'c'的时候,就从其中队列中取出一个item*,掉用一下函数

          conn *conn_new(const int sfd, enum conn_states init_state,
                          const int event_flags,
                          const int read_buffer_size, enum network_transport transport,
                          struct event_base *base) 

          同样,调用
           conn *c = conn_from_freelist();

          取出一个conn* ,然后进行初始化,这个时候和上文讲到的一样了,知识状态不同了, 因此这里使用了一个状态机的模式了。
          有如下状态:
          enum conn_states {
              conn_listening,  /**< the socket which listens for connections */
              conn_new_cmd,    /**< Prepare connection for next command */
              conn_waiting,    /**< waiting for a readable socket */
              conn_read,       /**< reading in a command line */
              conn_parse_cmd,  /**< try to parse a command from the input buffer */
              conn_write,      /**< writing out a simple response */
              conn_nread,      /**< reading in a fixed number of bytes */
              conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
              conn_closing,    /**< closing this connection */
              conn_mwrite,     /**< writing out many items sequentially */
              conn_max_state   /**< Max state value (used for assertion) */
          };

          也就是
          static void drive_machine(conn *c)
          的核心逻辑了。通过设置状态,然后调用不同的代码,
          因此在一个状态结束之后,总是会看大如下代码调用:
          /*
           * Sets a connection's current state in the state machine. Any special
           * processing that needs to happen on certain state transitions can
           * happen here.
           */
          static void conn_set_state(conn *c, enum conn_states state) {
              assert(c != NULL);
              assert(state >= conn_listening && state < conn_max_state);
          
              if (state != c->state) {
                  if (settings.verbose > 2) {
                      fprintf(stderr, "%d: going from %s to %s\n",
                              c->sfd, state_text(c->state),
                              state_text(state));
                  }
          
                  if (state == conn_write || state == conn_mwrite) {
                      MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
                  }
                  c->state = state;
              }
          }
          



          到此,网络框架部分已经基本处理完成。起始这个框架是非常简单而且实用的。 redis也是基本的思想模型,只不过是单线程的,而memcached是多线程的模型。在开发模式上可以有效的借鉴。

          该文章为原创文章,更多文章,欢迎访问 //blog.csdn.net/wallwind










转载自:https://www.2cto.com/kf/201409/333617.html

招聘 不方便扫码就复制添加关注:程序员招聘谷,微信号:jobs1024



memcached源码阅读----使用libevent
本篇文章主要是我今天阅读memcached源码关于进程启动,在网络这块做了哪些事情。一、iblievent的使用首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基
Spring学习(五)——集成memcached客户端
memcached是高性能的分布式内存缓存服务器。许多Web应用都将数据保存到RDBMS中,应用服务器从中读取数据并在浏览器中显示。但随着数据量的增大、访问的集中,就会出现RDBMS的负担加重、数据库响应恶化、网站显
Simple-Spring-Memcached深入研究一
Simple-Spring-Memcached(简称ssm),它也是一个通过Annatation与AOP来完成缓存数据操作的开源项目。仔细看了一下代码,基本上把我之前碰到的问题都解决了,而且MultiCache这一块的实现超出我的预期。该项目主要
java中memcached作为hibernate的二级缓存
因项目的问题,需要用到服务器缓存技术,我们选择了分布式的memcached来作为hibernate的二级缓存,需要用的的jar包如下:memcached-2.1.jarhibernate-memcached-1.2.2.jarspy-2.4.jar另外还需要下载memcached服务...
simple-spring-memcached简介
memcached是一款非常优秀的分布式缓存工具,有效提升了按主键检索数据的性能问题。而simple-spring-memcached组件通过与spring框架整合,让memcached的调用变得更加简单。simple-spring-memcached本质上是采用了...
Memcached和Redis分布式锁方案实例讲解
Memcached和Redis分布式锁方案实例讲解。分布式缓存,能解决单台服务器内存不能无限扩张的瓶颈。在分布式缓存的应用中,会遇到多个客户端同时争用的问题。这个时候,需要用到分布式锁,得到锁的客户端才有操作权限。
Redis和Memcached的对比讲解
Redis和Memcached的对比讲解。实际项目开发中,我们经常使用Redis做缓存,也是当前最流行的Nosql数据库,那么Redis有什么优势呢?我们可以和另外一个缓存Memcached做一下比较,让我们在两者比较重来理解Redis的优势和使用。
ehcache、memcache、redis三大缓存的使用比较
ehcache、memcache、redis三大缓存的使用比较。最近项目组有用到这三个缓存,去各自的官方看了下,觉得还真的各有千秋!今天特意归纳下各个缓存的优缺点,仅供参考!
Memcached概念、原理及使用场合讲解
Memcached概念、原理及使用场合讲解。概念:Memcached是一套分布式内存对象缓存系统,使用于在动态系统中减少数据库负载,提升程序的性能。原理:Memcached是一种基于内存的key-value存储,用来存储小块的任意数据(字符串、对象)。这些数据可以是数据库调用、API调用或
memcached常用命令及使用讲解
memcached常用命令及使用讲解。1、启动Memcache常用参数