当前位置:
文档之家› memcached源代码分析
memcached源代码分析
Part2
Part2
Memcached的线程关系
typedef struct { pthread_t thread_id; struct event_base *base; struct event notify_event; int notify_receive_fd; int notify_send_fd; CQ new_conn_queue; } LIBEVENT_THREAD; void thread_init(int nthreads, struct event_base *main_base) { threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; setup_thread(&threads[i]); for (i = 1; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); } } static void setup_thread(LIBEVENT_THREAD *me) { if (! me->base) me->base = event_init(); event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); if (event_add(&me->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } cq_init(&me->new_conn_queue); }
使用memcached的公司
Part2
预备知识
libevent简介 线程池模型简介
Part2
libevent是一个开源的、跨平台的事件处理函数库。 它封装了网络、信号以及定时器的事件,以及附带了一些缓冲区管理功能,非常适合用 来处理server的底层IO。对于网络事件,他封装了epoll, select, kqueue(BSD)等模 型,性能非常好。 下面是使用libevent的一个简单例子:
static int server_socket(const int port, const bool is_udp) { socket(); bind() listen(); conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, false, main_base); } conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp, struct event_base *base) { conn *c = conn_from_freelist(); event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event); c->ev_flags = event_flags; event_add(&c->event, 0) ; } //主线程和worker线程的公有事件处理回调函数 void event_handler(const int fd, const short which, void *arg) { conn *c; c = (conn *)arg; assert(c != NULL); c->which = which; drive_machine(c); //处理具体的事件 return; }
Libevent由于所有的事件都是由一个event_base对象来管理,是一个全局对象因此不支 持多线程。Memcached给每个线程创建了一个event_base对象,每个线程自己管理 自己的网络IO
Part2
static void delete_handler(const int fd, const short which, void *arg) { struct timeval t = {.tv_sec = 5, .tv_usec = 0}; static bool initialized = false; if (initialized) { evtimer_del(&deleteevent); } else { initialized = true; } evtimer_set(&deleteevent, delete_handler, 0); event_base_set(main_base, &deleteevent); evtimer_add(&deleteevent, &t); run_deferred_deletes(); } void do_run_deferred_deletes(void) { for (i = 0; i < delcurr; i++) { item *it = todelete[i]; if (item_delete_lock_over(it)) { it->it_flags &= ~ITEM_DELETED; do_item_unlink(it); do_item_remove(it); } else { todelete[j++] = it; } } delcurr = j; }
static void thread_libevent_process(int fd, short which, void *arg) { read(fd, buf, 1); item = cq_peek(&me->new_conn_queue); conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size/*2048*/, item->is_udp, me->base); }
Memcached源代码分析
looloochen 2009-7-20
P在用memcached Memcached的适用场合
Part 1
简介
memcached是高性能的,分布式的内存对象缓存系统,常常用于在应用 中减少数据库负载,提升访问速度。 memcached由Danga Interactive公司开发,是一个开源软件。 memcached一般作为数据库的前端cache使用,用于减少磁盘的访问, 它的数据都以key-value的形式存储,访问数据很简单,不像db解析 SQL,一条简单的命令即可存取数据。 可以使用unix域套接字,tcp以及udp协议访问memcached,因此使用 memcached是有代价的,如果想要缓存的数据不需要被共享,那么本 地缓存可能更适用一些。
static void drive_machine(conn *c) { while (!stop) { switch(c->state) { case conn_listening: //只有主线程才会有的事件 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, false); break; case conn_read: if (try_read_command(c) != 0) { continue; } if (try_read_network(c)) != 0) { continue; } update_event(c, EV_READ | EV_PERSIST); break; case conn_nread: complete_nread(c); break; res = read(c->sfd, c->ritem, c->rlbytes); if (res > 0) { c->ritem += res; c->rlbytes -= res; break; } if (res == 0) { /* end of stream */ conn_set_state(c, conn_closing); break; } if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { if (!update_event(c, EV_READ | EV_PERSIST)) { conn_set_state(c, conn_closing); break; } stop = true; break; } //其它错误,关闭socket conn_set_state(c, conn_closing); break; case conn_swallow: ... case conn_write: ... case conn_mwrite: ... ... case conn_closing: if (c->udp) conn_cleanup(c); else conn_close(c); stop = true; break; } } return; }
struct event_base *base = event_init(); event_set(&event, fd, EV_READ, callback_fun, arg); event_base_set(base, &event); event_add(&, 0); event_base_loop(base, 0); 它的功能相当于: While(1) { FD_SET(fd, rd_set); select(max_fd + 1, rd_set, NULL, NULL, tv); for(int i = 0; i < max_fd; ++i) if(FD_ISSET(i, rd_set)) callback_fun(fd, arg); }