用户工具

站点工具


about_twemproxy_2

twemproxy源码分析 第二章 实现分析

作者:陈科

联系方式:chenke1818@gmail.com

转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_twemproxy_2

1.数据结构

为了维护客户端和proxy的连接,以及proxy和server之间的连接,twemproxy设计了一个双向链表来管理每个client或server的连接队列,并间接实现了lru功能。

通过宏的方式来实现的,可读性其实挺差的:

#define TAILQ_ENTRY(type)                                               \
struct {                                                                \
    struct type *tqe_next;  /* next element */                          \
    struct type **tqe_prev; /* address of previous next element */      \
    TRACEBUF                                                            \
}

比如初始化一个connection的连接:

TAILQ_HEAD(conn_tqh, conn);

2.服务器模型

1.twemproxy的服务器可以启动多个端口的proxy服务

2.每个proxy维护多个客户端连接,同时维护多个连接到各个server(默认每个server只有1个连接)

3.client和server之间的数据传递是通过imsg_q和omsg_q的指针设置来完成的。(上图仅展示了imsg_q的操作)

3.请求流程

整个请求的流程图已经函数调用的过程可以看下图:

 *
 *             Client+             Proxy           Server+
 *                              (nutcracker)
 *                                   .
 *       msg_recv {read event}       .       msg_recv {read event}
 *         +                         .                         +
 *         |                         .                         |
 *         \                         .                         /
 *         req_recv_next             .             rsp_recv_next
 *           +                       .                       +
 *           |                       .                       |       Rsp
 *           req_recv_done           .           rsp_recv_done      <===
 *             +                     .                     +
 *             |                     .                     |
 *    Req      \                     .                     /
 *    ===>     req_filter*           .           *rsp_filter
 *               +                   .                   +
 *               |                   .                   |
 *               \                   .                   /
 *               req_forward-//  (a) . (c)  \\-rsp_forward
 *                                   .
 *                                   .
 *       msg_send {write event}      .      msg_send {write event}
 *         +                         .                         +
 *         |                         .                         |
 *    Rsp' \                         .                         /     Req'
 *   <===  rsp_send_next             .             req_send_next     ===>
 *           +                       .                       +
 *           |                       .                       |
 *           \                       .                       /
 *           rsp_send_done-//    (d) . (b)    //-req_send_done
 *
 *
 * (a) -> (b) -> (c) -> (d) is the normal flow of transaction consisting
 * of a single request response, where (a) and (b) handle request from
 * client, while (c) and (d) handle the corresponding response from the
 * server.
 */

4.zero-copy的实现

twemproxy通过巧妙的指针实现了,将msg从client读取之后,发送给server的零次拷贝,反之也成立。

我们先来看下connection的实现:

struct conn {
    TAILQ_ENTRY(conn)  conn_tqe;      /* link in server_pool / server / free q */
    void               *owner;        /* connection owner - server_pool / server */
 
    int                sd;            /* socket descriptor */
    int                family;        /* socket address family */
    socklen_t          addrlen;       /* socket length */
    struct sockaddr    *addr;         /* socket address (ref in server or server_pool) */
 
    struct msg_tqh     imsg_q;        /* incoming request Q */
    struct msg_tqh     omsg_q;        /* outstanding request Q */
    struct msg         *rmsg;         /* current message being rcvd */
    struct msg         *smsg;         /* current message being sent */
 
    conn_recv_t        recv;          /* recv (read) handler */
    conn_recv_next_t   recv_next;     /* recv next message handler */
    conn_recv_done_t   recv_done;     /* read done handler */
    conn_send_t        send;          /* send (write) handler */
    conn_send_next_t   send_next;     /* write next message handler */
    conn_send_done_t   send_done;     /* write done handler */
    conn_close_t       close;         /* close handler */
    conn_active_t      active;        /* active? handler */
 
    conn_ref_t         ref;           /* connection reference handler */
    conn_unref_t       unref;         /* connection unreference handler */
 
    conn_msgq_t        enqueue_inq;   /* connection inq msg enqueue handler */
    conn_msgq_t        dequeue_inq;   /* connection inq msg dequeue handler */
    conn_msgq_t        enqueue_outq;  /* connection outq msg enqueue handler */
    conn_msgq_t        dequeue_outq;  /* connection outq msg dequeue handler */
 
    size_t             recv_bytes;    /* received (read) bytes */
    size_t             send_bytes;    /* sent (written) bytes */
 
    uint32_t           events;        /* connection io events */
    err_t              err;           /* connection errno */
    unsigned           recv_active:1; /* recv active? */
    unsigned           recv_ready:1;  /* recv ready? */
    unsigned           send_active:1; /* send active? */
    unsigned           send_ready:1;  /* send ready? */
 
    unsigned           client:1;      /* client? or server? */
    unsigned           proxy:1;       /* proxy? */
    unsigned           connecting:1;  /* connecting? */
    unsigned           connected:1;   /* connected? */
    unsigned           eof:1;         /* eof? aka passive close? */
    unsigned           done:1;        /* done? aka close? */
    unsigned           redis:1;       /* redis? */
};

其中:

    struct msg_tqh     imsg_q;        /* incoming request Q */
    struct msg_tqh     omsg_q;        /* outstanding request Q */
    struct msg         *rmsg;         /* current message being rcvd */
    struct msg         *smsg;         /* current message being sent */

imsg_q为接收请求的队列 omsg_q为发送请求的队列

通过对指针rmsg和smsg的设置来达到zero-copy的目的,我们来看代码:

static rstatus_t
msg_recv_chain(struct context *ctx, struct conn *conn, struct msg *msg)
{
    rstatus_t status;
    struct msg *nmsg;
    struct mbuf *mbuf;
    size_t msize;
    ssize_t n;
 
    mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next);
    if (mbuf == NULL || mbuf_full(mbuf)) {
        mbuf = mbuf_get();
        if (mbuf == NULL) {
            return NC_ENOMEM;
        }
        mbuf_insert(&msg->mhdr, mbuf);
        msg->pos = mbuf->pos;
    }
    ASSERT(mbuf->end - mbuf->last > 0);
 
    msize = mbuf_size(mbuf);
 
    n = conn_recv(conn, mbuf->last, msize);
    if (n < 0) {
        if (n == NC_EAGAIN) {
            return NC_OK;
        }
        return NC_ERROR;
    }
 
    ASSERT((mbuf->last + n) <= mbuf->end);
    mbuf->last += n;
    msg->mlen += (uint32_t)n;
 
    for (;;) {
        status = msg_parse(ctx, conn, msg);
        if (status != NC_OK) {
            return status;
        }
 
        /* get next message to parse */
        nmsg = conn->recv_next(ctx, conn, false);
        if (nmsg == NULL || nmsg == msg) {
            /* no more data to parse */
            break;
        }
 
        msg = nmsg;
    }
 
    return NC_OK;
}

接收请求的时候先把收到的数据保存到了mbuf队列中,然后看发送:

req_send_next(struct context *ctx, struct conn *conn)
{
    rstatus_t status;
    struct msg *msg, *nmsg; /* current and next message */
 
    ASSERT(!conn->client && !conn->proxy);
 
    if (conn->connecting) {
        server_connected(ctx, conn);
    }
 
    nmsg = TAILQ_FIRST(&conn->imsg_q);
    if (nmsg == NULL) {
        /* nothing to send as the server inq is empty */
        status = event_del_out(ctx->evb, conn);
        if (status != NC_OK) {
            conn->err = errno;
        }
 
        return NULL;
    }
 
    msg = conn->smsg;
    if (msg != NULL) {
        ASSERT(msg->request && !msg->done);
        nmsg = TAILQ_NEXT(msg, s_tqe);
    }
 
    conn->smsg = nmsg;
 
    if (nmsg == NULL) {
        return NULL;
    }
 
    ASSERT(nmsg->request && !nmsg->done);
 
    log_debug(LOG_VVERB, "send next req %"PRIu64" len %"PRIu32" type %d on "
              "s %d", nmsg->id, nmsg->mlen, nmsg->type, conn->sd);
 
    return nmsg;
}

conn→smsg指针指向了之前接收队列中的数据。

5.分布式策略

twemproxy支持3种策略:

ketama:一致性hash的实现

modula:通过强hash取模来对应服务器

radom:随机分配服务器连接

我们这里介绍下ketama的算法:

rstatus_t
ketama_update(struct server_pool *pool)
{
    uint32_t nserver;             /* # server - live and dead */
    uint32_t nlive_server;        /* # live server */
    uint32_t pointer_per_server;  /* pointers per server proportional to weight */
    uint32_t pointer_per_hash;    /* pointers per hash */
    uint32_t pointer_counter;     /* # pointers on continuum */
    uint32_t pointer_index;       /* pointer index */
    uint32_t points_per_server;   /* points per server */
    uint32_t continuum_index;     /* continuum index */
    uint32_t continuum_addition;  /* extra space in the continuum */
    uint32_t server_index;        /* server index */
    uint32_t value;               /* continuum value */
    uint32_t total_weight;        /* total live server weight */
    int64_t now;                  /* current timestamp in usec */
 
    ASSERT(array_n(&pool->server) > 0);
 
    now = nc_usec_now();
    if (now < 0) {
        return NC_ERROR;
    }
 
    /*
     * Count live servers and total weight, and also update the next time to
     * rebuild the distribution
     */
    nserver = array_n(&pool->server);
    nlive_server = 0;
    total_weight = 0;
    pool->next_rebuild = 0LL;
    for (server_index = 0; server_index < nserver; server_index++) {
        struct server *server = array_get(&pool->server, server_index);
 
        if (pool->auto_eject_hosts) {
            if (server->next_retry <= now) {
                server->next_retry = 0LL;
                nlive_server++;
            } else if (pool->next_rebuild == 0LL ||
                       server->next_retry < pool->next_rebuild) {
                pool->next_rebuild = server->next_retry;
            }
        } else {
            nlive_server++;
        }
 
        ASSERT(server->weight > 0);
 
        /* count weight only for live servers */
        if (!pool->auto_eject_hosts || server->next_retry <= now) {
            total_weight += server->weight;
        }
    }
 
    pool->nlive_server = nlive_server;
 
    if (nlive_server == 0) {
        log_debug(LOG_DEBUG, "no live servers for pool %"PRIu32" '%.*s'",
                  pool->idx, pool->name.len, pool->name.data);
 
        return NC_OK;
    }
    log_debug(LOG_DEBUG, "%"PRIu32" of %"PRIu32" servers are live for pool "
              "%"PRIu32" '%.*s'", nlive_server, nserver, pool->idx,
              pool->name.len, pool->name.data);
 
    continuum_addition = KETAMA_CONTINUUM_ADDITION;
    points_per_server = KETAMA_POINTS_PER_SERVER;
    /*
     * Allocate the continuum for the pool, the first time, and every time we
     * add a new server to the pool
     */
    if (nlive_server > pool->nserver_continuum) {
        struct continuum *continuum;
        uint32_t nserver_continuum = nlive_server + continuum_addition;
        uint32_t ncontinuum = nserver_continuum * points_per_server;
 
        continuum = nc_realloc(pool->continuum, sizeof(*continuum) * ncontinuum);
        if (continuum == NULL) {
            return NC_ENOMEM;
        }
 
        pool->continuum = continuum;
        pool->nserver_continuum = nserver_continuum;
        /* pool->ncontinuum is initialized later as it could be <= ncontinuum */
    }
 
    /*
     * Build a continuum with the servers that are live and points from
     * these servers that are proportial to their weight
     */
    continuum_index = 0;
    pointer_counter = 0;
    for (server_index = 0; server_index < nserver; server_index++) {
        struct server *server;
        float pct;
 
        server = array_get(&pool->server, server_index);
 
        if (pool->auto_eject_hosts && server->next_retry > now) {
            continue;
        }
 
        pct = (float)server->weight / (float)total_weight;
        pointer_per_server = (uint32_t) ((floorf((float) (pct * KETAMA_POINTS_PER_SERVER / 4 * (float)nlive_server + 0.0000000001))) * 4);
        pointer_per_hash = 4;
 
        log_debug(LOG_VERB, "%.*s:%"PRIu16" weight %"PRIu32" of %"PRIu32" "
                  "pct %0.5f points per server %"PRIu32"",
                  server->name.len, server->name.data, server->port,
                  server->weight, total_weight, pct, pointer_per_server);
 
        for (pointer_index = 1;
             pointer_index <= pointer_per_server / pointer_per_hash;
             pointer_index++) {
 
            char host[KETAMA_MAX_HOSTLEN]= "";
            size_t hostlen;
            uint32_t x;
 
            hostlen = snprintf(host, KETAMA_MAX_HOSTLEN, "%.*s-%u",
                               server->name.len, server->name.data,
                               pointer_index - 1);
 
            for (x = 0; x < pointer_per_hash; x++) {
                value = ketama_hash(host, hostlen, x);
                pool->continuum[continuum_index].index = server_index;
                pool->continuum[continuum_index++].value = value;
            }
        }
        pointer_counter += pointer_per_server;
    }
 
    pool->ncontinuum = pointer_counter;
    qsort(pool->continuum, pool->ncontinuum, sizeof(*pool->continuum),
          ketama_item_cmp);
 
    for (pointer_index = 0;
         pointer_index < ((nlive_server * KETAMA_POINTS_PER_SERVER) - 1);
         pointer_index++) {
        if (pointer_index + 1 >= pointer_counter) {
            break;
        }
        ASSERT(pool->continuum[pointer_index].value <=
               pool->continuum[pointer_index + 1].value);
    }
 
    log_debug(LOG_VERB, "updated pool %"PRIu32" '%.*s' with %"PRIu32" of "
              "%"PRIu32" servers live in %"PRIu32" slots and %"PRIu32" "
              "active points in %"PRIu32" slots", pool->idx,
              pool->name.len, pool->name.data, nlive_server, nserver,
              pool->nserver_continuum, pool->ncontinuum,
              (pool->nserver_continuum + continuum_addition) * points_per_server);
 
    return NC_OK;
}

上述算法通过对各个服务器按照name hash之后,构建了一个hash环(weight权重不一样,则服务器在环中所占的比重不一样)

假如retry时间超过了当前时间,则服务器会被踢出环中:

if (pool->auto_eject_hosts && server->next_retry > now) {
            continue;
        }

假如连接死掉了,服务器会设置重试的时间,过段时间重新尝试是否可以使用:

static void
server_failure(struct context *ctx, struct server *server)
{
    struct server_pool *pool = server->owner;
    int64_t now, next;
    rstatus_t status;
 
    if (!pool->auto_eject_hosts) {
        return;
    }
 
    server->failure_count++;
 
    log_debug(LOG_VERB, "server '%.*s' failure count %"PRIu32" limit %"PRIu32,
              server->pname.len, server->pname.data, server->failure_count,
              pool->server_failure_limit);
 
    if (server->failure_count < pool->server_failure_limit) {
        return;
    }
 
    now = nc_usec_now();
    if (now < 0) {
        return;
    }
 
    stats_server_set_ts(ctx, server, server_ejected_at, now);
 
    next = now + pool->server_retry_timeout;
 
    log_debug(LOG_INFO, "update pool %"PRIu32" '%.*s' to delete server '%.*s' "
              "for next %"PRIu32" secs", pool->idx, pool->name.len,
              pool->name.data, server->pname.len, server->pname.data,
              pool->server_retry_timeout / 1000 / 1000);
 
    stats_pool_incr(ctx, pool, server_ejects);
 
    server->failure_count = 0;
    server->next_retry = next;
 
    status = server_pool_run(pool);
    if (status != NC_OK) {
        log_error("updating pool %"PRIu32" '%.*s' failed: %s", pool->idx,
                  pool->name.len, pool->name.data, strerror(errno));
    }
}

另外只有不满足以下条件,才会重新构建hash环:

 if (!pool->auto_eject_hosts) {
        return NC_OK;
    }
 
    if (pool->next_rebuild == 0LL) {
        return NC_OK;
    }
 
    now = nc_usec_now();
    if (now < 0) {
        return NC_ERROR;
    }
 
    if (now <= pool->next_rebuild) {
        if (pool->nlive_server == 0) {
            errno = ECONNREFUSED;
            return NC_ERROR;
        }
        return NC_OK;
    }

这样的好处是,只要连接一恢复就可以重新使用,缺点是,连接如果长时间不恢复,会造成操作经常失败。

6.连接管理

每个server都维护了一个连接队列s_conn_q:

struct server {
    uint32_t           idx;           /* server index */
    struct server_pool *owner;        /* owner pool */
 
    struct string      pname;         /* name:port:weight (ref in conf_server) */
    struct string      name;          /* name (ref in conf_server) */
    uint16_t           port;          /* port */
    uint32_t           weight;        /* weight */
    int                family;        /* socket family */
    socklen_t          addrlen;       /* socket length */
    struct sockaddr    *addr;         /* socket address (ref in conf_server) */
 
    uint32_t           ns_conn_q;     /* # server connection */
    struct conn_tqh    s_conn_q;      /* server connection q */
 
    int64_t            next_retry;    /* next retry time in usec */
    uint32_t           failure_count; /* # consecutive failures */
};

从队列拿连接的时候,都会移除队尾的数据:

static struct conn *
_conn_get(void)
{
    struct conn *conn;
 
    if (!TAILQ_EMPTY(&free_connq)) {
        ASSERT(nfree_connq > 0);
 
        conn = TAILQ_FIRST(&free_connq);
        nfree_connq--;
        TAILQ_REMOVE(&free_connq, conn, conn_tqe);
    } else {
        conn = nc_alloc(sizeof(*conn));
        if (conn == NULL) {
            return NULL;
        }
    }
 
    conn->owner = NULL;
 
    conn->sd = -1;
    /* {family, addrlen, addr} are initialized in enqueue handler */
 
    TAILQ_INIT(&conn->imsg_q);
    TAILQ_INIT(&conn->omsg_q);
    conn->rmsg = NULL;
    conn->smsg = NULL;
 
    /*
     * Callbacks {recv, recv_next, recv_done}, {send, send_next, send_done},
     * {close, active}, parse, {ref, unref}, {enqueue_inq, dequeue_inq} and
     * {enqueue_outq, dequeue_outq} are initialized by the wrapper.
     */
 
    conn->send_bytes = 0;
    conn->recv_bytes = 0;
 
    conn->events = 0;
    conn->err = 0;
    conn->recv_active = 0;
    conn->recv_ready = 0;
    conn->send_active = 0;
    conn->send_ready = 0;
 
    conn->client = 0;
    conn->proxy = 0;
    conn->connecting = 0;
    conn->connected = 0;
    conn->eof = 0;
    conn->done = 0;
    conn->redis = 0;
 
    ntotal_conn++;
    ncurr_conn++;
 
    return conn;
}

如果是空队列则分配内存插入新连接。

about_twemproxy_2.txt · 最后更改: 2018/10/14 15:31 (外部编辑)