用户工具

站点工具


about_memcached_2

Memcached的内存管理机制

作者:陈科

联系方式:chenke1818@gmail.com

转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_memcached_2

1.slabclass初始化

main函数在执行过程中调用了:

slabs_init(settings.maxbytes, settings.factor, preallocate);

我们来看slabs_init函数的执行过程:

void slabs_init(const size_t limit, const double factor, const bool prealloc) {
    int i = POWER_SMALLEST - 1;
    unsigned int size = sizeof(item) + settings.chunk_size;
 
    mem_limit = limit;
 
   //mem_base指针指向预分配的内存首地址
    if (prealloc) {
        /* Allocate everything in a big chunk with malloc */
        mem_base = malloc(mem_limit);
        if (mem_base != NULL) {
            mem_current = mem_base;
            mem_avail = mem_limit;
        } else {
            fprintf(stderr, "Warning: Failed to allocate requested memory in"
                    " one large chunk.\nWill allocate in smaller chunks\n");
        }
    }
 
    memset(slabclass, 0, sizeof(slabclass));
 
   //POWER_LARGEST为slabclass的个数
   //每个slabclass的内存空间大小一样,都为:settings.item_size_max
   //size根据factor因子递增,size就是每个slab的大小
   //perslab为每个slabclass能存放多少个slab
    while (++i < POWER_LARGEST && size <= settings.item_size_max / factor) {
        /* Make sure items are always n-byte aligned */
        if (size % CHUNK_ALIGN_BYTES)
            size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
 
        slabclass[i].size = size;
        slabclass[i].perslab = settings.item_size_max / slabclass[i].size;
        size *= factor;
        if (settings.verbose > 1) {
            fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                    i, slabclass[i].size, slabclass[i].perslab);
        }
    }
 
 
    //最大的那个slabclass只能放一个trunk
    power_largest = i;
    slabclass[power_largest].size = settings.item_size_max;
    slabclass[power_largest].perslab = 1;
    if (settings.verbose > 1) {
        fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                i, slabclass[i].size, slabclass[i].perslab);
    }
 
    /* for the test suite:  faking of how much we've already malloc'd */
    {
        char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
        if (t_initial_malloc) {
            mem_malloced = (size_t)atol(t_initial_malloc);
        }
 
    }
 
    if (prealloc) {
        //对每个slabclass进行内存分配
        slabs_preallocate(power_largest);
    }
}

我们接着看:slabs_preallocate

//slabs_preallocate将会对每个slabclass预分配1M的内存空间
static void slabs_preallocate (const unsigned int maxslabs) {
    int i;
    unsigned int prealloc = 0;
 
    /* pre-allocate a 1MB slab in every size class so people don't get
       confused by non-intuitive "SERVER_ERROR out of memory"
       messages.  this is the most common question on the mailing
       list.  if you really don't want this, you can rebuild without
       these three lines.  */
 
    for (i = POWER_SMALLEST; i <= POWER_LARGEST; i++) {
        if (++prealloc > maxslabs)
            return;
        if (do_slabs_newslab(i) == 0) {
            fprintf(stderr, "Error while preallocating slab memory!\n"
                "If using -L or other prealloc options, max memory must be "
                "at least %d megabytes.\n", power_largest);
            exit(1);
        }
    }
 
}

我们来看do_slabs_newslab:

static int do_slabs_newslab(const unsigned int id) {
    slabclass_t *p = &slabclass[id];
    int len = settings.slab_reassign ? settings.item_size_max
        : p->size * p->perslab;
    char *ptr;
    //grow_slab_list将会分配slabclass的链表所占的空间
    //memory_allocate将会对预分配的内存空间指针进行移动返回合适的位置ptr指针
    if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0) ||
        (grow_slab_list(id) == 0) ||
        ((ptr = memory_allocate((size_t)len)) == 0)) {
 
        MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
        return 0;
    }
    //将该slabclass区域清零
    memset(ptr, 0, (size_t)len);
    //split_slab_page_into_freelist将会初始化指定slabclass中的每个slab
    split_slab_page_into_freelist(ptr, id);
 
    p->slab_list[p->slabs++] = ptr;
    mem_malloced += len;
    MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);
 
    return 1;
}

2.请求内存分配

在main函数初始化libevent的过程中设置了回调函数:

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);

event_handler最终调用了:drive_machine

当进行set操作的时候,其最终调用了item_alloc函数:

item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
    item *it;
    /* do_item_alloc handles its own locks */
    it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
    return it;
}

我们可以看到最终调用了:do_item_alloc

item *do_item_alloc(char *key, const size_t nkey, const int flags,
                    const rel_time_t exptime, const int nbytes,
                    const uint32_t cur_hv) {
    uint8_t nsuffix;
    item *it = NULL;
    char suffix[40];
    //计算item所占的内存大小
    size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
    if (settings.use_cas) {
        ntotal += sizeof(uint64_t);
    }
 
    //寻找大小最相近的slabclass
    unsigned int id = slabs_clsid(ntotal);
    if (id == 0)
        return 0;
 
    mutex_lock(&cache_lock);
    /* do a quick check if we have any expired items in the tail.. */
    int tries = 5;
    /* Avoid hangs if a slab has nothing but refcounted stuff in it. */
    int tries_lrutail_reflocked = 1000;
    int tried_alloc = 0;
    item *search;
    item *next_it;
    void *hold_lock = NULL;
    rel_time_t oldest_live = settings.oldest_live;
 
    //tails为每个slabclass中分配出去的item链表,在新分配内存的过程中,假如有已经超时的item,则优先分配tail中的内存。
    search = tails[id];
    /* We walk up *only* for locked items. Never searching for expired.
     * Waste of CPU for almost all deployments */
    for (; tries > 0 && search != NULL; tries--, search=next_it) {
        /* we might relink search mid-loop, so search->prev isn't reliable */
        next_it = search->prev;
 
        if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1) {
            /* We are a crawler, ignore it. */
            tries++;
            continue;
        }
        uint32_t hv = hash(ITEM_key(search), search->nkey);
        /* Attempt to hash item lock the "search" item. If locked, no
         * other callers can incr the refcount
         */
        /* Don't accidentally grab ourselves, or bail if we can't quicklock */
        if (hv == cur_hv || (hold_lock = item_trylock(hv)) == NULL)
            continue;
        /* Now see if the item is refcount locked */
        if (refcount_incr(&search->refcount) != 2) {
            /* Avoid pathological case with ref'ed items in tail */
            do_item_update_nolock(search);
            tries_lrutail_reflocked--;
            tries++;
            refcount_decr(&search->refcount);
            itemstats[id].lrutail_reflocked++;
            /* Old rare bug could cause a refcount leak. We haven't seen
             * it in years, but we leave this code in to prevent failures
             * just in case */
             * 
            if (settings.tail_repair_time &&
                    search->time + settings.tail_repair_time < current_time) {
                itemstats[id].tailrepairs++;
                search->refcount = 1;
                do_item_unlink_nolock(search, hv);
            }
            if (hold_lock)
                item_trylock_unlock(hold_lock);
 
            if (tries_lrutail_reflocked < 1)
                break;
 
            continue;
        }
 
        /* Expired or flushed */
        /* 先检查 LRU 队列最后一个 item 是否超时, 超时的话就把这个 item 分配给用户 */ 
        if ((search->exptime != 0 && search->exptime < current_time)
            || (search->time <= oldest_live && oldest_live <= current_time)) {
            itemstats[id].reclaimed++;
            if ((search->it_flags & ITEM_FETCHED) == 0) {
                itemstats[id].expired_unfetched++;
            }
            it = search;
            slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
            do_item_unlink_nolock(it, hv);
            /* Initialize the item block: */
            it->slabs_clsid = 0;
        } else if ((it = slabs_alloc(ntotal, id)) == NULL) {
            tried_alloc = 1;
            if (settings.evict_to_free == 0) {
                itemstats[id].outofmemory++;
            } else {
                itemstats[id].evicted++;
                itemstats[id].evicted_time = current_time - search->time;
                if (search->exptime != 0)
                    itemstats[id].evicted_nonzero++;
                if ((search->it_flags & ITEM_FETCHED) == 0) {
                    itemstats[id].evicted_unfetched++;
                }
                it = search;
                slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
                 /* 把这个 item 从 LRU 队列和哈希表中移除 */ 
                do_item_unlink_nolock(it, hv);
                /* Initialize the item block: */
                it->slabs_clsid = 0;
 
                /* If we've just evicted an item, and the automover is set to
                 * angry bird mode, attempt to rip memory into this slab class.
                 * TODO: Move valid object detection into a function, and on a
                 * "successful" memory pull, look behind and see if the next alloc
                 * would be an eviction. Then kick off the slab mover before the
                 * eviction happens.
                 */
            /* 没有超时的 item, 那就尝试从 slabclass 分配, 运气不好的话, 分配失败, 
           那就把 LRU 队列最后一个 item 剔除, 然后分配给用户 */  
                if (settings.slab_automove == 2)
                    slabs_reassign(-1, id);
            }
        }
 
        refcount_decr(&search->refcount);
        /* If hash values were equal, we don't grab a second lock */
        if (hold_lock)
            item_trylock_unlock(hold_lock);
        break;
    }
   /* LRU 队列是空的, 或者锁住了, 那就只能从 slabclass 分配内存 */ 
    if (!tried_alloc && (tries == 0 || search == NULL))
        it = slabs_alloc(ntotal, id);
 
    if (it == NULL) {
        itemstats[id].outofmemory++;
        mutex_unlock(&cache_lock);
        return NULL;
    }
 
    assert(it->slabs_clsid == 0);
    assert(it != heads[id]);
 
    /* Item initialization can happen outside of the lock; the item's already
     * been removed from the slab LRU.
     */
    /* 顺便对 item 做一些初始化 */
    it->refcount = 1;     /* the caller will have a reference */
    mutex_unlock(&cache_lock);
    it->next = it->prev = it->h_next = 0;
    it->slabs_clsid = id;
 
    DEBUG_REFCNT(it, '*');
    it->it_flags = settings.use_cas ? ITEM_CAS : 0;
    it->nkey = nkey;
    it->nbytes = nbytes;
    memcpy(ITEM_key(it), key, nkey);
    it->exptime = exptime;
    memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    it->nsuffix = nsuffix;
    return it;
}

我们接着看如何从指定的slabclass中获取指定size的内存:do_slabs_alloc

static void *do_slabs_alloc(const size_t size, unsigned int id) {
    slabclass_t *p;
    void *ret = NULL;
    item *it = NULL;
 
    if (id < POWER_SMALLEST || id > power_largest) {
        MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);
        return NULL;
    }
 
    p = &slabclass[id];
    assert(p->sl_curr == 0 || ((item *)p->slots)->slabs_clsid == 0);
 
    /* fail unless we have space at the end of a recently allocated page,
       we have something on our freelist, or we could allocate a new page */
    /* 确保最后一个slab 有空闲的chunk */
    if (! (p->sl_curr != 0 || do_slabs_newslab(id) != 0)) {
        /* We don't have more memory available */
        //没有足够的内存空间
        ret = NULL;
    //从slot回收队列中分配item空间
    } else if (p->sl_curr != 0) {
        /* return off our freelist */
        it = (item *)p->slots;
        p->slots = it->next;
        if (it->next) it->next->prev = 0;
        p->sl_curr--;
        ret = (void *)it;
    }
 
    if (ret) {
        p->requested += size;
        MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);
    } else {
        MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);
    }
 
    return ret;
}

3.内存整体架构图

4.hash表

memcached维护了2个hash表primary和old,当hash表扩张的时候,暂时使用old表:

if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        it = old_hashtable[oldbucket];
    } else {
        it = primary_hashtable[hv & hashmask(hashpower)];
    }

扩容是通过信号机制来完成的:

在main函数启动的时候,启动了一个assoc_maintenance_thread线程,阻塞在pthread_cond_wait(&maintenance_cond, &cache_lock):

if (!expanding) {
            /* finished expanding. tell all threads to use fine-grained locks */
            switch_item_lock_type(ITEM_LOCK_GRANULAR);
            slabs_rebalancer_resume();
            /* We are done expanding.. just wait for next invocation */
            mutex_lock(&cache_lock);
            started_expanding = false;
            pthread_cond_wait(&maintenance_cond, &cache_lock);
            /* Before doing anything, tell threads to use a global lock */
            mutex_unlock(&cache_lock);
            slabs_rebalancer_pause();
            switch_item_lock_type(ITEM_LOCK_GLOBAL);
            mutex_lock(&cache_lock);
            assoc_expand();
            mutex_unlock(&cache_lock);
        }

如果需要扩张的时候则会调用:

static void assoc_start_expand(void) {
    if (started_expanding)
        return;
    started_expanding = true;
    pthread_cond_signal(&maintenance_cond);
}

5.cas机制

如原来MEMCACHED中的KES的内容为A,客户端C1和客户端C2都把A取了出来,C1往准备往其中加B,C2准备往其中加C,这就会造成C1和C2执行后的CACHE KEYS要么是AB要么是AC,而不会出现我们期望的ABC。这种情况,如果不是在集群环境中,而只是单机服务器,可以通过在写CACHE KEYS时增加同步锁,就可以解决问题,可是在集群环境中,同步锁是显然解决不了问题的。

虽然memcached对每个item的读写操作都有互斥锁来做并发控制,但是这种客户端写入的一致性确无法保证,所以memcached单独实现了cas机制。

我们看do_store_item函数中的片段:

else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
            // cas validates
            // it and old_it may belong to different classes.
            // I'm updating the stats for the one that's getting pushed out
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
            pthread_mutex_unlock(&c->thread->stats.mutex);
 
            item_replace(old_it, it, hv);
            stored = STORED;
        } else {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
            pthread_mutex_unlock(&c->thread->stats.mutex);
 
            if(settings.verbose > 1) {
                fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
                        (unsigned long long)ITEM_get_cas(old_it),
                        (unsigned long long)ITEM_get_cas(it));
            }
            stored = EXISTS;
        }
    } 

上面代码进行了版本号的比较操作

然后在do_item_link函数中进行版本号的增加:

 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
uint64_t get_cas_id(void) {
    static uint64_t cas_id = 0;
    return ++cas_id;
}

6.slab automove和slab rebalance

考虑这样的一个情景:在一开始,由于业务原因向memcached存储大量长度为1KB的数据,也就是说memcached服务器进程里面有很多大小为1KB的

item。现在由于业务调整需要存储大量10KB的数据,并且很少使用1KB的那些数据了。由于数据越来越多,内存开始吃紧。大小为10KB的那些item

频繁访问,并且由于内存不够需要使用LRU淘汰一些10KB的item。

对于上面的情景,会不会觉得大量1KB的item实在太浪费了。由于很少访问这些item,所以即使它们超时过期了,还是会占据着哈希表和

LRU队列。LRU队列还好,不同大小的item使用不同的LRU队列。但对于哈希表来说大量的僵尸item会增加哈希冲突的可能性,并且在迁

移哈希表的时候也浪费时间。有没有办法干掉这些item?使用LRU爬虫+lru_crawler命令是可以强制干掉这些僵尸item。但干掉这些僵

尸item后,它们占据的内存是归还到1KB的那些slab分配器中。1KB的slab分配器不会为10KB的item分配内存。所以还是功亏一篑。

那有没有别的办法呢?是有的。memcached提供的slab automove 和 rebalance两个东西就是完成这个功能的。在默认情况下,

memcached不启动这个功能,所以要想使用这个功能必须在启动memcached的时候加上参数-o slab_reassign。之后就可以在客户端发

送命令slabsreassign <source class> <dest class>,手动将source class的内存页分给dest class。后文会把这个工作称为内

存页重分配。而命令slabs automove则是让memcached自动检测是否需要进行内存页重分配,如果需要的话就自动去操作,这样一切都

不需要人工的干预。

具体原理请参考:

http://blog.csdn.net/luotuo44/article/details/43015129

about_memcached_2.txt · 最后更改: 2018/10/14 15:31 (外部编辑)