diff --git a/globals.c b/globals.c index 7d7b2a366f..a01e370ac5 100644 --- a/globals.c +++ b/globals.c @@ -21,3 +21,5 @@ volatile rel_time_t current_time; /** exported globals **/ struct stats stats; struct settings settings; +struct slab_rebalance slab_rebal; +volatile int slab_rebalance_signal; diff --git a/items.c b/items.c index b6097f0684..a743fb577f 100644 --- a/items.c +++ b/items.c @@ -123,7 +123,6 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0)); /* Initialize the item block: */ it->slabs_clsid = 0; - it->refcount = 0; } else if ((it = slabs_alloc(ntotal, id)) == NULL) { if (settings.evict_to_free == 0) { itemstats[id].outofmemory++; @@ -149,7 +148,6 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0)); /* Initialize the item block: */ it->slabs_clsid = 0; - it->refcount = 0; } } else { /* If the LRU is empty or locked, attempt to allocate memory */ @@ -181,11 +179,11 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_tim /* Item initialization can happen outside of the lock; the item's already * been removed from the slab LRU. */ + it->refcount = 1; /* the caller will have a reference */ pthread_mutex_unlock(&cache_lock); + it->next = it->prev = it->h_next = 0; it->slabs_clsid = id; - it->next = it->prev = it->h_next = 0; - it->refcount = 1; /* the caller will have a reference */ DEBUG_REFCNT(it, '*'); it->it_flags = settings.use_cas ? ITEM_CAS : 0; it->nkey = nkey; diff --git a/memcached.c b/memcached.c index b8db218e34..e89c555d8b 100644 --- a/memcached.c +++ b/memcached.c @@ -102,6 +102,9 @@ struct stats stats; struct settings settings; time_t process_started; /* when the process was started */ +struct slab_rebalance slab_rebal; +volatile int slab_rebalance_signal; + /** file scope variables **/ static conn *listen_conn = NULL; static struct event_base *main_base; @@ -170,7 +173,9 @@ static void stats_init(void) { stats.curr_bytes = stats.listen_disabled_num = 0; stats.hash_power_level = stats.hash_bytes = stats.hash_is_expanding = 0; stats.expired_unfetched = stats.evicted_unfetched = 0; + stats.slabs_moved = 0; stats.accepting_conns = true; /* assuming we start in this state. */ + stats.slab_reassign_running = false; /* make the time we started always be 2 seconds before we really did, so time(0) - time.started is never zero. if so, things @@ -218,6 +223,8 @@ static void settings_init(void) { settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */ settings.maxconns_fast = false; settings.hashpower_init = 0; + settings.slab_reassign = false; + settings.slab_automove = false; } /* @@ -2572,6 +2579,10 @@ static void server_stats(ADD_STAT add_stats, conn *c) { APPEND_STAT("hash_is_expanding", "%u", stats.hash_is_expanding); APPEND_STAT("expired_unfetched", "%llu", stats.expired_unfetched); APPEND_STAT("evicted_unfetched", "%llu", stats.evicted_unfetched); + if (settings.slab_reassign) { + APPEND_STAT("slab_reassign_running", "%u", stats.slab_reassign_running); + APPEND_STAT("slabs_moved", "%llu", stats.slabs_moved); + } STATS_UNLOCK(); } @@ -2604,6 +2615,8 @@ static void process_stat_settings(ADD_STAT add_stats, void *c) { APPEND_STAT("item_size_max", "%d", settings.item_size_max); APPEND_STAT("maxconns_fast", "%s", settings.maxconns_fast ? "yes" : "no"); APPEND_STAT("hashpower_init", "%d", settings.hashpower_init); + APPEND_STAT("slab_reassign", "%s", settings.slab_reassign ? "yes" : "no"); + APPEND_STAT("slab_automove", "%s", settings.slab_automove ? "yes" : "no"); } static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { @@ -3290,6 +3303,45 @@ static void process_command(conn *c, char *command) { conn_set_state(c, conn_closing); + } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 && + strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) { + int src, dst, rv; + + if (settings.slab_reassign == false) { + out_string(c, "CLIENT_ERROR slab reassignment disabled"); + return; + } + + src = strtol(tokens[2].value, NULL, 10); + dst = strtol(tokens[3].value, NULL, 10); + + if (errno == ERANGE) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + rv = slabs_reassign(src, dst); + switch (rv) { + case REASSIGN_OK: + out_string(c, "OK"); + break; + case REASSIGN_RUNNING: + out_string(c, "BUSY"); + break; + case REASSIGN_BADCLASS: + out_string(c, "BADCLASS"); + break; + case REASSIGN_NOSPARE: + out_string(c, "NOSPARE"); + break; + case REASSIGN_DEST_NOT_FULL: + out_string(c, "NOTFULL"); + break; + case REASSIGN_SRC_NOT_SAFE: + out_string(c, "UNSAFE"); + break; + } + return; } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) { process_verbosity_command(c, tokens, ntokens); } else { @@ -4639,11 +4691,15 @@ int main (int argc, char **argv) { char *subopts_value; enum { MAXCONNS_FAST = 0, - HASHPOWER_INIT + HASHPOWER_INIT, + SLAB_REASSIGN, + SLAB_AUTOMOVE }; char *const subopts_tokens[] = { [MAXCONNS_FAST] = "maxconns_fast", [HASHPOWER_INIT] = "hashpower", + [SLAB_REASSIGN] = "slab_reassign", + [SLAB_AUTOMOVE] = "slab_automove", NULL }; @@ -4889,6 +4945,12 @@ int main (int argc, char **argv) { return 1; } break; + case SLAB_REASSIGN: + settings.slab_reassign = true; + break; + case SLAB_AUTOMOVE: + settings.slab_automove = true; + break; default: printf("Illegal suboption \"%s\"\n", subopts_value); return 1; @@ -5042,6 +5104,11 @@ int main (int argc, char **argv) { exit(EXIT_FAILURE); } + if (settings.slab_reassign && + start_slab_maintenance_thread() == -1) { + exit(EXIT_FAILURE); + } + /* initialise clock event */ clock_handler(0, 0, 0); diff --git a/memcached.h b/memcached.h index f626643639..b4dfb3bb39 100644 --- a/memcached.h +++ b/memcached.h @@ -265,6 +265,8 @@ struct stats { bool hash_is_expanding; /* If the hash table is being expanded */ uint64_t expired_unfetched; /* items reclaimed but never touched */ uint64_t evicted_unfetched; /* items evicted but never touched */ + bool slab_reassign_running; /* slab reassign in progress */ + uint64_t slabs_moved; /* times slabs were moved around */ }; #define MAX_VERBOSITY_LEVEL 2 @@ -298,6 +300,8 @@ struct settings { int item_size_max; /* Maximum item size, and upper end for slabs */ bool sasl; /* SASL on/off */ bool maxconns_fast; /* Whether or not to early close connections */ + bool slab_reassign; /* Whether or not slab reassignment is allowed */ + bool slab_automove; /* Whether or not to automatically move slabs */ int hashpower_init; /* Starting hash power level */ }; @@ -452,6 +456,21 @@ struct conn { /* current time of day (updated periodically) */ extern volatile rel_time_t current_time; +/* TODO: Move to slabs.h? */ +extern volatile int slab_rebalance_signal; + +struct slab_rebalance { + void *slab_start; + void *slab_end; + void *slab_pos; + int s_clsid; + int d_clsid; + int busy_items; + uint8_t done; +}; + +extern struct slab_rebalance slab_rebal; + /* * Functions */ diff --git a/slabs.c b/slabs.c index 48fbdb960c..eddd59eb04 100644 --- a/slabs.c +++ b/slabs.c @@ -27,9 +27,8 @@ typedef struct { unsigned int size; /* sizes of items */ unsigned int perslab; /* how many items per slab */ - void **slots; /* list of item ptrs */ - unsigned int sl_total; /* size of previous array */ - unsigned int sl_curr; /* first free slot */ + void *slots; /* list of item ptrs */ + unsigned int sl_curr; /* total free items in list */ void *end_page_ptr; /* pointer to next free item at end of page, or 0 */ unsigned int end_page_free; /* number of items remaining at end of last alloced page */ @@ -192,7 +191,8 @@ static int grow_slab_list (const unsigned int id) { static int do_slabs_newslab(const unsigned int id) { slabclass_t *p = &slabclass[id]; - int len = p->size * p->perslab; + int len = settings.slab_reassign ? settings.item_size_max + : p->size * p->perslab; char *ptr; if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0) || @@ -218,6 +218,7 @@ static int do_slabs_newslab(const unsigned int id) { static void *do_slabs_alloc(const size_t size, unsigned int id) { slabclass_t *p; void *ret = NULL; + item *it = NULL; if (id < POWER_SMALLEST || id > power_largest) { MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0); @@ -225,7 +226,7 @@ static void *do_slabs_alloc(const size_t size, unsigned int id) { } p = &slabclass[id]; - assert(p->sl_curr == 0 || ((item *)p->slots[p->sl_curr - 1])->slabs_clsid == 0); + assert(p->sl_curr == 0 || ((item *)p->slots)->slabs_clsid == 0); #ifdef USE_SYSTEM_MALLOC if (mem_limit && mem_malloced + size > mem_limit) { @@ -246,7 +247,10 @@ static void *do_slabs_alloc(const size_t size, unsigned int id) { ret = NULL; } else if (p->sl_curr != 0) { /* return off our freelist */ - ret = p->slots[--p->sl_curr]; + it = (item *)p->slots; + p->slots = it->next; + p->sl_curr--; + ret = (void *)it; } else { /* if we recently allocated a whole page, return from that */ assert(p->end_page_ptr != NULL); @@ -270,6 +274,7 @@ static void *do_slabs_alloc(const size_t size, unsigned int id) { static void do_slabs_free(void *ptr, const size_t size, unsigned int id) { slabclass_t *p; + item *it; assert(((item *)ptr)->slabs_clsid == 0); assert(id >= POWER_SMALLEST && id <= power_largest); @@ -285,15 +290,13 @@ static void do_slabs_free(void *ptr, const size_t size, unsigned int id) { return; #endif - if (p->sl_curr == p->sl_total) { /* need more space on the free list */ - int new_size = (p->sl_total != 0) ? p->sl_total * 2 : 16; /* 16 is arbitrary */ - void **new_slots = realloc(p->slots, new_size * sizeof(void *)); - if (new_slots == 0) - return; - p->slots = new_slots; - p->sl_total = new_size; - } - p->slots[p->sl_curr++] = ptr; + it = (item *)ptr; + it->prev = 0; + it->next = p->slots; + if (it->next) it->next->prev = it; + p->slots = it; + + p->sl_curr++; p->requested -= size; return; } @@ -453,3 +456,269 @@ void slabs_adjust_mem_requested(unsigned int id, size_t old, size_t ntotal) p->requested = p->requested - old + ntotal; pthread_mutex_unlock(&slabs_lock); } + +static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER; +static volatile int do_run_slab_thread = 1; + +#define DEFAULT_SLAB_BULK_CHECK 1 +int slab_bulk_check = DEFAULT_SLAB_BULK_CHECK; + +static int slab_rebalance_start(void) { + slabclass_t *s_cls; + slabclass_t *d_cls; + int no_go = 0; + + pthread_mutex_lock(&cache_lock); + pthread_mutex_lock(&slabs_lock); + + if (slab_rebal.s_clsid < POWER_SMALLEST || + slab_rebal.s_clsid > power_largest || + slab_rebal.d_clsid < POWER_SMALLEST || + slab_rebal.d_clsid > power_largest) + no_go = -2; + + s_cls = &slabclass[slab_rebal.s_clsid]; + d_cls = &slabclass[slab_rebal.d_clsid]; + + if (d_cls->end_page_ptr || s_cls->end_page_ptr || + !grow_slab_list(slab_rebal.d_clsid)) { + no_go = -1; + } + + if (s_cls->slabs < 2) + no_go = -3; + + if (no_go != 0) { + pthread_mutex_unlock(&slabs_lock); + pthread_mutex_unlock(&cache_lock); + return no_go; /* Should use a wrapper function... */ + } + + s_cls->killing = 1; + + slab_rebal.slab_start = s_cls->slab_list[s_cls->killing - 1]; + slab_rebal.slab_end = (char *)slab_rebal.slab_start + + (s_cls->size * s_cls->perslab); + slab_rebal.slab_pos = slab_rebal.slab_start; + slab_rebal.done = 0; + + /* Also tells do_item_get to search for items in this slab */ + slab_rebalance_signal = 2; + + if (settings.verbose > 1) { + fprintf(stderr, "Started a slab rebalance\n"); + } + + pthread_mutex_unlock(&slabs_lock); + pthread_mutex_unlock(&cache_lock); + + STATS_LOCK(); + stats.slab_reassign_running = true; + STATS_UNLOCK(); + + return 0; +} + +/* refcount == 0 is safe since nobody can incr while cache_lock is held. + * refcount != 0 is impossible since flags/etc can be modified in other + * threads. instead, note we found a busy one and bail. logic in do_item_get + * will prevent busy items from continuing to be busy + */ +static int slab_rebalance_move(void) { + slabclass_t *s_cls; + int x; + int was_busy = 0; + + pthread_mutex_lock(&cache_lock); + pthread_mutex_lock(&slabs_lock); + + s_cls = &slabclass[slab_rebal.s_clsid]; + + for (x = 0; x < slab_bulk_check; x++) { + item *it = slab_rebal.slab_pos; + if (it->refcount == 0) { + if (it->it_flags & ITEM_SLABBED) { + /* remove from freelist linked list */ + if (s_cls->slots == it) { + s_cls->slots = it->next; + } + if (it->next) it->next->prev = it->prev; + if (it->prev) it->prev->next = it->next; + s_cls->sl_curr--; + } else if (it->it_flags != 0) { + it->refcount = 1; + /* Call unlink with refcount == 1 so it won't free */ + do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0)); + it->refcount = 0; + } + it->it_flags = 0; + it->slabs_clsid = 0; + } else { + if (settings.verbose > 2) { + fprintf(stderr, "Slab reassign hit a busy item: refcount: %d (%d -> %d)\n", + it->refcount, slab_rebal.s_clsid, slab_rebal.d_clsid); + } + slab_rebal.busy_items++; + was_busy++; + } + + slab_rebal.slab_pos = (char *)slab_rebal.slab_pos + s_cls->size; + if (slab_rebal.slab_pos >= slab_rebal.slab_end) + break; + } + + if (slab_rebal.slab_pos >= slab_rebal.slab_end) { + /* Some items were busy, start again from the top */ + if (slab_rebal.busy_items) { + slab_rebal.slab_pos = slab_rebal.slab_start; + slab_rebal.busy_items = 0; + } else { + slab_rebal.done++; + } + } + + pthread_mutex_unlock(&slabs_lock); + pthread_mutex_unlock(&cache_lock); + + return was_busy; +} + +static void slab_rebalance_finish(void) { + slabclass_t *s_cls; + slabclass_t *d_cls; + + pthread_mutex_lock(&cache_lock); + pthread_mutex_lock(&slabs_lock); + + s_cls = &slabclass[slab_rebal.s_clsid]; + d_cls = &slabclass[slab_rebal.d_clsid]; + + /* At this point the stolen slab is completely clear */ + s_cls->slab_list[s_cls->killing - 1] = + s_cls->slab_list[s_cls->slabs - 1]; + s_cls->slabs--; + s_cls->killing = 0; + + memset(slab_rebal.slab_start, 0, (size_t)settings.item_size_max); + + d_cls->slab_list[d_cls->slabs++] = slab_rebal.slab_start; + d_cls->end_page_ptr = slab_rebal.slab_start; + d_cls->end_page_free = d_cls->perslab; + + slab_rebal.done = 0; + slab_rebal.s_clsid = 0; + slab_rebal.d_clsid = 0; + slab_rebal.slab_start = NULL; + slab_rebal.slab_end = NULL; + slab_rebal.slab_pos = NULL; + + slab_rebalance_signal = 0; + + pthread_mutex_unlock(&slabs_lock); + pthread_mutex_unlock(&cache_lock); + + STATS_LOCK(); + stats.slab_reassign_running = false; + stats.slabs_moved++; + STATS_UNLOCK(); + + if (settings.verbose > 1) { + fprintf(stderr, "finished a slab move\n"); + } +} + +/* Slab rebalancer thread. + * Does not use spinlocks since it is not timing sensitive. Burn less CPU and + * go to sleep if locks are contended + */ +static void *slab_maintenance_thread(void *arg) { + int was_busy = 0; + + while (do_run_slab_thread) { + /* TODO: Call code to make a calculated decision */ + + if (slab_rebalance_signal == 1) { + if (slab_rebalance_start() < 0) { + /* Handle errors with more specifity as required. */ + slab_rebalance_signal = 0; + } + + } else if (slab_rebalance_signal && slab_rebal.slab_start != NULL) { + /* If we have a decision to continue, continue it */ + was_busy = slab_rebalance_move(); + } + + if (slab_rebal.done) { + slab_rebalance_finish(); + } + + /* Sleep a bit if no work to do, or waiting on busy objects */ + if (was_busy || !slab_rebalance_signal) + sleep(1); + } + return NULL; +} + +static enum reassign_result_type do_slabs_reassign(int src, int dst) { + if (slab_rebalance_signal != 0) + return REASSIGN_RUNNING; + + if (src < POWER_SMALLEST || src > power_largest || + dst < POWER_SMALLEST || dst > power_largest) + return REASSIGN_BADCLASS; + + if (slabclass[src].slabs < 2) + return REASSIGN_NOSPARE; + + if (slabclass[dst].end_page_ptr) + return REASSIGN_DEST_NOT_FULL; + + if (slabclass[src].end_page_ptr) + return REASSIGN_SRC_NOT_SAFE; + + slab_rebal.s_clsid = src; + slab_rebal.d_clsid = dst; + + slab_rebalance_signal = 1; + + return REASSIGN_OK; +} + +enum reassign_result_type slabs_reassign(int src, int dst) { + enum reassign_result_type ret; + mutex_lock(&slabs_lock); + ret = do_slabs_reassign(src, dst); + pthread_mutex_unlock(&slabs_lock); + return ret; +} + +static pthread_t maintenance_tid; + +int start_slab_maintenance_thread(void) { + int ret; + slab_rebalance_signal = 0; + slab_rebal.slab_start = NULL; + char *env = getenv("MEMCACHED_SLAB_BULK_CHECK"); + if (env != NULL) { + slab_bulk_check = atoi(env); + if (slab_bulk_check == 0) { + slab_bulk_check = DEFAULT_SLAB_BULK_CHECK; + } + } + if ((ret = pthread_create(&maintenance_tid, NULL, + slab_maintenance_thread, NULL)) != 0) { + fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); + return -1; + } + return 0; +} + +void stop_slab_maintenance_thread(void) { + mutex_lock(&cache_lock); + do_run_slab_thread = 0; + pthread_cond_signal(&maintenance_cond); + pthread_mutex_unlock(&cache_lock); + + /* Wait for the maintenance thread to stop */ + pthread_join(maintenance_tid, NULL); +} diff --git a/slabs.h b/slabs.h index 21da116755..ea83e45573 100644 --- a/slabs.h +++ b/slabs.h @@ -33,4 +33,14 @@ bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c); /** Fill buffer with stats */ /*@null@*/ void slabs_stats(ADD_STAT add_stats, void *c); +int start_slab_maintenance_thread(void); +void stop_slab_maintenance_thread(void); + +enum reassign_result_type { + REASSIGN_OK=0, REASSIGN_RUNNING, REASSIGN_BADCLASS, REASSIGN_NOSPARE, + REASSIGN_DEST_NOT_FULL, REASSIGN_SRC_NOT_SAFE +}; + +enum reassign_result_type slabs_reassign(int src, int dst); + #endif diff --git a/t/binary.t b/t/binary.t index 85ed46f990..adc30c0553 100755 --- a/t/binary.t +++ b/t/binary.t @@ -2,7 +2,7 @@ use strict; use warnings; -use Test::More tests => 3533; +use Test::More tests => 3539; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; diff --git a/t/slabs_reassign.t b/t/slabs_reassign.t new file mode 100644 index 0000000000..85a8c2bcfe --- /dev/null +++ b/t/slabs_reassign.t @@ -0,0 +1,71 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use Test::More tests => 111; +use FindBin qw($Bin); +use lib "$Bin/lib"; +use MemcachedTest; + +# Enable manual slab reassign, cap at 6 slabs +my $server = new_memcached('-o slab_reassign -m 4'); +my $stats = mem_stats($server->sock, ' settings'); +is($stats->{slab_reassign}, "yes"); + +my $sock = $server->sock; + +# Fill a largeish slab until it evicts (honors the -m 6) +my $bigdata = 'x' x 70000; # slab 31 +for (1 .. 50) { + print $sock "set bfoo$_ 0 0 70000\r\n", $bigdata, "\r\n"; + is(scalar <$sock>, "STORED\r\n", "stored key"); +} + +# Fill a smaller slab until it evicts +my $smalldata = 'y' x 20000; # slab 25 +for (1 .. 50) { + print $sock "set sfoo$_ 0 0 20000\r\n", $smalldata, "\r\n"; + is(scalar <$sock>, "STORED\r\n", "stored key"); +} + +my $items_before = mem_stats($sock, "items"); +isnt($items_before->{"items:31:evicted"}, 0, "slab 31 evicted is nonzero"); +isnt($items_before->{"items:25:evicted"}, 0, "slab 25 evicted is nonzero"); + +my $slabs_before = mem_stats($sock, "slabs"); +# Move a large slab to the smaller slab +print $sock "slabs reassign 31 25\r\n"; +is(scalar <$sock>, "OK\r\n", "slab rebalancer started"); + +# Still working out how/if to signal the thread. For now, just sleep. +sleep 2; + +# Check that stats counters increased +my $slabs_after = mem_stats($sock, "slabs"); +my $stats = mem_stats($sock); + +isnt($stats->{slabs_moved}, 0, "slabs moved is nonzero"); + +# Check that slab stats reflect the change +ok($slabs_before->{"31:total_pages"} != $slabs_after->{"31:total_pages"}, + "slab 31 pagecount changed"); +ok($slabs_before->{"25:total_pages"} != $slabs_after->{"25:total_pages"}, + "slab 25 pagecount changed"); + +# Try to move another slab, see that it complains +print $sock "slabs reassign 31 25\r\n"; +is(scalar <$sock>, "NOTFULL\r\n", "Cannot re-run against class with empty space"); + +# Try to move a page backwards. Should complain that source class isn't "safe" +# to move from. +print $sock "slabs reassign 25 31\r\n"; +is(scalar <$sock>, "UNSAFE\r\n", "Cannot move an unsafe slab back"); + +# Try to insert items into both slabs +print $sock "set bfoo51 0 0 70000\r\n", $bigdata, "\r\n"; +is(scalar <$sock>, "STORED\r\n", "stored key"); + +print $sock "set sfoo51 0 0 20000\r\n", $smalldata, "\r\n"; +is(scalar <$sock>, "STORED\r\n", "stored key"); + +# Do need to come up with better automated tests for this.