author | Alberto Bertogli
<albertito@gmail.com> 2007-04-29 06:37:09 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2007-04-29 06:37:09 UTC |
parent | 0dea87c6521681c2e2eaf265ef631304ebca4fea |
nmdb/cache.c | +49 | -0 |
nmdb/cache.h | +3 | -0 |
nmdb/db.c | +35 | -0 |
nmdb/net-const.h | +4 | -0 |
nmdb/queue.c | +4 | -0 |
nmdb/queue.h | +2 | -0 |
nmdb/tipc.c | +119 | -4 |
nmdb/tipc.h | +1 | -0 |
diff --git a/nmdb/cache.c b/nmdb/cache.c index 27f426f..59538b3 100644 --- a/nmdb/cache.c +++ b/nmdb/cache.c @@ -307,3 +307,52 @@ exit: return rv; } + +/* Performs a cache compare-and-swap. + * Returns -2 if there was an error, -1 if the key is not in the cache, 0 if + * the old value does not match, and 1 if the CAS was successful. */ +int cache_cas(struct cache *cd, const unsigned char *key, size_t ksize, + const unsigned char *oldval, size_t ovsize, + const unsigned char *newval, size_t nvsize) +{ + int rv = 1; + uint32_t h = 0; + struct cache_chain *c; + struct cache_entry *e; + unsigned char *buf; + + h = hash(key, ksize) % cd->hashlen; + c = cd->table + h; + + e = find_in_chain(c, key, ksize); + + if (e == NULL) { + rv = -1; + goto exit; + } + + if (e->vsize != ovsize) { + rv = 0; + goto exit; + } + + if (memcmp(e->val, oldval, ovsize) != 0) { + rv = 0; + goto exit; + } + + buf = malloc(nvsize); + if (buf == NULL) { + rv = -2; + goto exit; + } + + memcpy(buf, newval, nvsize); + free(e->val); + e->val = buf; + e->vsize = nvsize; + +exit: + return rv; +} + diff --git a/nmdb/cache.h b/nmdb/cache.h index fd31900..66c705d 100644 --- a/nmdb/cache.h +++ b/nmdb/cache.h @@ -44,6 +44,9 @@ int cache_get(struct cache *cd, const unsigned char *key, size_t ksize, int cache_set(struct cache *cd, const unsigned char *k, size_t ksize, const unsigned char *v, size_t vsize); int cache_del(struct cache *cd, const unsigned char *key, size_t ksize); +int cache_cas(struct cache *cd, const unsigned char *key, size_t ksize, + const unsigned char *oldval, size_t ovsize, + const unsigned char *newval, size_t nvsize); #endif diff --git a/nmdb/db.c b/nmdb/db.c index 0589bc2..5ebdfac 100644 --- a/nmdb/db.c +++ b/nmdb/db.c @@ -3,6 +3,7 @@ #include <time.h> /* nanosleep() */ #include <errno.h> /* ETIMEDOUT */ #include <stdio.h> /* perror() */ +#include <string.h> /* memcmp() */ #include "common.h" #include "db.h" @@ -136,6 +137,40 @@ static void process_op(db_t *db, struct queue_entry *e) } else if (e->operation == REQ_DEL_ASYNC) { db_del(db, e->key, e->ksize); + } else if (e->operation == REQ_CAS) { + unsigned char *dbval; + size_t dbvsize = 64 * 1024; + + /* Compare */ + dbval = malloc(dbvsize); + if (dbval == NULL) { + tipc_reply_err(e->req, ERR_MEM); + return; + } + rv = db_get(db, e->key, e->ksize, dbval, &dbvsize); + if (rv == 0) { + tipc_reply_get(e->req, REP_NOTIN, NULL, 0); + free(dbval); + return; + } + + if (e->vsize == dbvsize && + memcmp(e->val, dbval, dbvsize) == 0) { + /* Swap */ + rv = db_set(db, e->key, e->ksize, e->newval, e->nvsize); + if (!rv) { + tipc_reply_err(e->req, ERR_DB); + return; + } + + tipc_reply_cas(e->req, REP_OK); + free(dbval); + return; + } + + tipc_reply_cas(e->req, REP_NOMATCH); + free(dbval); + } else { printf("Unknown op 0x%x\n", e->operation); } diff --git a/nmdb/net-const.h b/nmdb/net-const.h index a382490..127b9b9 100644 --- a/nmdb/net-const.h +++ b/nmdb/net-const.h @@ -23,6 +23,8 @@ #define REQ_DEL_SYNC 0x106 #define REQ_SET_ASYNC 0x107 #define REQ_DEL_ASYNC 0x108 +#define REQ_CACHE_CAS 0x109 +#define REQ_CAS 0x110 /* Network replies (different namespace from requests) */ #define REP_ERR 0x800 @@ -30,6 +32,7 @@ #define REP_CACHE_MISS 0x802 #define REP_OK 0x803 #define REP_NOTIN 0x804 +#define REP_NOMATCH 0x805 /* Network error replies */ #define ERR_VER 0x101 /* Version mismatch */ @@ -39,5 +42,6 @@ #define ERR_MEM 0x105 /* Memory allocation error */ #define ERR_DB 0x106 /* Database error */ + #endif diff --git a/nmdb/queue.c b/nmdb/queue.c index 3afbde5..eb56b09 100644 --- a/nmdb/queue.c +++ b/nmdb/queue.c @@ -77,8 +77,10 @@ struct queue_entry *queue_entry_create(void) e->operation = 0; e->key = NULL; e->val = NULL; + e->newval = NULL; e->ksize = 0; e->vsize = 0; + e->nvsize = 0; e->prev = NULL; return e; @@ -93,6 +95,8 @@ void queue_entry_free(struct queue_entry *e) { free(e->key); if (e->val) free(e->val); + if (e->newval) + free(e->newval); free(e); return; } diff --git a/nmdb/queue.h b/nmdb/queue.h index 69a0002..93e7227 100644 --- a/nmdb/queue.h +++ b/nmdb/queue.h @@ -20,8 +20,10 @@ struct queue_entry { unsigned char *key; unsigned char *val; + unsigned char *newval; size_t ksize; size_t vsize; + size_t nvsize; struct queue_entry *prev; /* A pointer to the next element on the list is actually not diff --git a/nmdb/tipc.c b/nmdb/tipc.c index 3a216bd..f6f6e3c 100644 --- a/nmdb/tipc.c +++ b/nmdb/tipc.c @@ -19,7 +19,7 @@ static void parse_msg(struct req_info *req, unsigned char *buf, static void parse_get(struct req_info *req, int impact_db); static void parse_set(struct req_info *req, int impact_db, int async); static void parse_del(struct req_info *req, int impact_db, int async); - +static void parse_cas(struct req_info *req, int impact_db); /* * Miscelaneous helper functions @@ -89,12 +89,13 @@ static void mini_reply(struct req_info *req, uint32_t reply) /* Create a queue entry structure based on the parameters passed. Memory * allocated here will be free()'d in queue_entry_free(). It's not the * cleanest way, but the alternatives are even messier. */ -static struct queue_entry *make_queue_entry(struct req_info *req, +static struct queue_entry *make_queue_long_entry(struct req_info *req, uint32_t operation, const unsigned char *key, size_t ksize, - const unsigned char *val, size_t vsize) + const unsigned char *val, size_t vsize, + const unsigned char *newval, size_t nvsize) { struct queue_entry *e; - unsigned char *kcopy, *vcopy; + unsigned char *kcopy, *vcopy, *nvcopy; e = queue_entry_create(); if (e == NULL) { @@ -123,11 +124,27 @@ static struct queue_entry *make_queue_entry(struct req_info *req, memcpy(vcopy, val, vsize); } + nvcopy = NULL; + if (newval != NULL) { + nvcopy = malloc(nvsize); + if (nvcopy == NULL) { + queue_entry_free(e); + if (kcopy != NULL) + free(kcopy); + if (vcopy != NULL) + free(vcopy); + return NULL; + } + memcpy(nvcopy, newval, nvsize); + } + e->operation = operation; e->key = kcopy; e->ksize = ksize; e->val = vcopy; e->vsize = vsize; + e->newval = nvcopy; + e->nvsize = nvsize; /* Create a copy of req, including clisa */ e->req = malloc(sizeof(struct req_info)); @@ -151,6 +168,16 @@ static struct queue_entry *make_queue_entry(struct req_info *req, return e; } +/* Like make_queue_long_entry() but with few parameters because most actions + * do not need newval. */ +static struct queue_entry *make_queue_entry(struct req_info *req, + uint32_t operation, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize) +{ + return make_queue_long_entry(req, operation, key, ksize, val, vsize, + NULL, 0); +} + /* The tipc_reply_* functions are used by the db code to send the network * replies. */ @@ -208,6 +235,11 @@ void tipc_reply_del(struct req_info *req, uint32_t reply) mini_reply(req, reply); } +void tipc_reply_cas(struct req_info *req, uint32_t reply) +{ + mini_reply(req, reply); +} + /* * Main functions for receiving and parsing @@ -339,6 +371,10 @@ static void parse_msg(struct req_info *req, unsigned char *buf, size_t bsize) parse_set(req, 1, 1); else if (cmd == REQ_DEL_ASYNC) parse_del(req, 1, 1); + else if (cmd == REQ_CACHE_CAS) + parse_cas(req, 0); + else if (cmd == REQ_CAS) + parse_cas(req, 1); else { stats.net_unk_req++; rep_send_error(req, ERR_UNKREQ); @@ -522,4 +558,83 @@ static void parse_del(struct req_info *req, int impact_db, int async) return; } +static void parse_cas(struct req_info *req, int impact_db) +{ + int rv; + unsigned char *key, *oldval, *newval; + uint32_t ksize, ovsize, nvsize; + const int max = 65536; + + /* Request format: + * 4 ksize + * 4 ovsize + * 4 nvsize + * ksize key + * ovsize oldval + * nvsize newval + */ + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + ovsize = * ( ((uint32_t *) req->payload) + 1); + ovsize = ntohl(ovsize); + nvsize = * ( ((uint32_t *) req->payload) + 2); + nvsize = ntohl(nvsize); + + /* Sanity check on sizes: + * - ksize, ovsize and nvsize must all be < req->psize + * - ksize, ovsize and nvsize must all be < 2^16 = 64k + * - ksize + ovsize + mvsize < 2^16 = 64k + */ + if ( (req->psize < ksize) || (req->psize < ovsize) || + (req->psize < nvsize) || + (ksize > max) || (ovsize > max) || + (nvsize > max) || + ( (ksize + ovsize + nvsize) > max) ) { + stats.net_broken_req++; + rep_send_error(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t) * 3; + oldval = key + ksize; + newval = oldval + ovsize; + + rv = cache_cas(cache_table, key, ksize, oldval, ovsize, + newval, nvsize); + if (rv == 0) { + /* If the cache doesn't match, there is no need to bother the + * DB even if we were asked to impact. */ + mini_reply(req, REP_NOMATCH); + return; + } + + if (!impact_db) { + if (rv == -1) { + mini_reply(req, REP_NOTIN); + return; + } else { + mini_reply(req, REP_OK); + return; + } + } else { + /* impact_db = 1 and the key is either not in the cache, or + * cache_cas() was successful. We now need to queue the CAS in + * the database. */ + struct queue_entry *e; + + e = make_queue_long_entry(req, REQ_CAS, key, ksize, + oldval, ovsize, newval, nvsize); + if (e == NULL) { + rep_send_error(req, ERR_MEM); + return; + } + + queue_lock(op_queue); + queue_put(op_queue, e); + queue_unlock(op_queue); + queue_signal(op_queue); + } + return; +} + diff --git a/nmdb/tipc.h b/nmdb/tipc.h index a717836..a743000 100644 --- a/nmdb/tipc.h +++ b/nmdb/tipc.h @@ -29,6 +29,7 @@ void tipc_reply_get(struct req_info *req, uint32_t reply, unsigned char *val, size_t vsize); void tipc_reply_set(struct req_info *req, uint32_t reply); void tipc_reply_del(struct req_info *req, uint32_t reply); +void tipc_reply_cas(struct req_info *req, uint32_t reply); #endif