author | Alberto Bertogli
<albertito@gmail.com> 2007-08-28 06:13:12 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2007-08-28 06:13:12 UTC |
parent | 0e2f733d3ae2c3d79fac26e7aacdfa282caec97b |
nmdb/cache.c | +60 | -1 |
nmdb/cache.h | +3 | -0 |
nmdb/dbloop.c | +47 | -0 |
nmdb/net-const.h | +2 | -0 |
nmdb/parse.c | +99 | -0 |
diff --git a/nmdb/cache.c b/nmdb/cache.c index 5d7a88b..9d1b898 100644 --- a/nmdb/cache.c +++ b/nmdb/cache.c @@ -6,11 +6,13 @@ */ #include <sys/types.h> /* for size_t */ -#include <stdint.h> /* for uint32_t */ +#include <stdint.h> /* for [u]int*_t */ #include <stdlib.h> /* for malloc() */ #include <string.h> /* for memcpy()/memcmp() */ +#include <stdio.h> /* snprintf() */ #include "cache.h" + struct cache *cache_create(size_t numobjs, unsigned int flags) { size_t i; @@ -352,3 +354,60 @@ exit: return rv; } + +/* Increment the value associated with the given key by the given increment. + * The increment is a signed 64 bit value, and the value size must be >= 8 + * bytes. + * Returns: + * 1 if the increment succeeded. + * -1 if the value was not in the cache. + * -2 if the value was not null terminated. + * -3 if there was a memory error. + */ +int cache_incr(struct cache *cd, const unsigned char *key, size_t ksize, + int64_t increment) +{ + uint32_t h = 0; + unsigned char *val, *newval; + int64_t intval; + size_t vsize; + struct cache_chain *c; + struct cache_entry *e; + + h = hash(key, ksize) % cd->hashlen; + c = cd->table + h; + + e = find_in_chain(c, key, ksize); + + if (e == NULL) + return -3; + + val = e->val; + vsize = e->vsize; + + /* the value must be a NULL terminated string, otherwise strtoll might + * cause a segmentation fault */ + if (val && val[vsize - 1] != '\0') + return -2; + + intval = strtoll((char *) val, NULL, 10); + intval = intval + increment; + + /* The max value for an unsigned long long is 18446744073709551615, + * and strlen('18446744073709551615') = 20, so if the value is smaller + * than 24 (just in case) we create a new buffer. */ + if (vsize < 24) { + newval = malloc(24); + if (newval == NULL) + return -3; + free(val); + e->val = val = newval; + e->vsize = vsize = 24; + } + + snprintf((char *) val, vsize, "%23lld", (long long int) intval); + + return 1; +} + + diff --git a/nmdb/cache.h b/nmdb/cache.h index 66c705d..c4d3d7f 100644 --- a/nmdb/cache.h +++ b/nmdb/cache.h @@ -5,6 +5,7 @@ /* Generic cache layer. See cache.c for more information. */ #include <sys/types.h> /* for size_t */ +#include <stdint.h> /* for int64_t */ struct cache { @@ -47,6 +48,8 @@ int cache_del(struct cache *cd, const unsigned char *key, size_t ksize); int cache_cas(struct cache *cd, const unsigned char *key, size_t ksize, const unsigned char *oldval, size_t ovsize, const unsigned char *newval, size_t nvsize); +int cache_incr(struct cache *cd, const unsigned char *key, size_t ksize, + int64_t increment); #endif diff --git a/nmdb/dbloop.c b/nmdb/dbloop.c index 9a80c0d..11f4f11 100644 --- a/nmdb/dbloop.c +++ b/nmdb/dbloop.c @@ -4,6 +4,7 @@ #include <errno.h> /* ETIMEDOUT */ #include <string.h> /* memcmp() */ #include <stdlib.h> /* malloc()/free() */ +#include <stdio.h> /* snprintf() */ #include "common.h" #include "dbloop.h" @@ -173,6 +174,52 @@ static void process_op(db_t *db, struct queue_entry *e) e->req->reply_cas(e->req, REP_NOMATCH); free(dbval); + } else if (e->operation == REQ_INCR) { + unsigned char *dbval; + size_t dbvsize = 64 * 1024; + int64_t intval; + + dbval = malloc(dbvsize); + if (dbval == NULL) { + e->req->reply_err(e->req, ERR_MEM); + return; + } + rv = db_get(db, e->key, e->ksize, dbval, &dbvsize); + if (rv == 0) { + e->req->mini_reply(e->req, REP_NOTIN); + free(dbval); + return; + } + + /* val must be NULL terminated; see cache_incr() */ + if (dbval && dbval[dbvsize - 1] != '\0') { + e->req->mini_reply(e->req, REP_NOMATCH); + free(dbval); + return; + } + + intval = strtoll((char *) dbval, NULL, 10); + intval = intval + * (int64_t *) e->val; + + if (dbvsize < 24) { + /* We know dbval is long enough because we've + * allocated it, so we only change dbvsize */ + dbvsize = 24; + } + + snprintf((char *) dbval, dbvsize, "%23lld", + (long long int) intval); + + rv = db_set(db, e->key, e->ksize, dbval, dbvsize); + if (!rv) { + e->req->reply_err(e->req, ERR_DB); + return; + } + + e->req->mini_reply(e->req, REP_OK); + + free(dbval); + } else { wlog("Unknown op 0x%x\n", e->operation); } diff --git a/nmdb/net-const.h b/nmdb/net-const.h index 79d9553..b9e875a 100644 --- a/nmdb/net-const.h +++ b/nmdb/net-const.h @@ -37,6 +37,8 @@ #define REQ_DEL_ASYNC 0x108 #define REQ_CACHE_CAS 0x109 #define REQ_CAS 0x110 +#define REQ_CACHE_INCR 0x111 +#define REQ_INCR 0x112 /* Network replies (different namespace from requests) */ #define REP_ERR 0x800 diff --git a/nmdb/parse.c b/nmdb/parse.c index 1acd07f..2fad1e1 100644 --- a/nmdb/parse.c +++ b/nmdb/parse.c @@ -16,6 +16,7 @@ static void parse_get(struct req_info *req, int impact_db); static void parse_set(struct req_info *req, int impact_db, int async); static void parse_del(struct req_info *req, int impact_db, int async); static void parse_cas(struct req_info *req, int impact_db); +static void parse_incr(struct req_info *req, int impact_db); /* Create a queue entry structure based on the parameters passed. Memory @@ -178,6 +179,10 @@ int parse_message(struct req_info *req, parse_cas(req, 0); else if (cmd == REQ_CAS) parse_cas(req, 1); + else if (cmd == REQ_CACHE_INCR) + parse_incr(req, 0); + else if (cmd == REQ_INCR) + parse_incr(req, 1); else { stats.net_unk_req++; req->reply_err(req, ERR_UNKREQ); @@ -437,3 +442,97 @@ static void parse_cas(struct req_info *req, int impact_db) } +/* ntohll() is not standard, so we define it using an UGLY trick because there + * is no standard way to check for endianness at runtime! */ +static uint64_t ntohll(uint64_t x) +{ + static int endianness = 0; + + /* determine the endianness by checking how htonl() behaves; use -1 + * for little endian and 1 for big endian */ + if (endianness == 0) { + if (htonl(1) == 1) + endianness = 1; + else + endianness = -1; + } + + if (endianness == 1) { + /* big endian */ + return x; + } + + /* little endian */ + return ( ntohl( (x >> 32) & 0xFFFFFFFF ) | \ + ( (uint64_t) ntohl(x & 0xFFFFFFFF) ) << 32 ); +} + + +static void parse_incr(struct req_info *req, int impact_db) +{ + int cres; + const unsigned char *key; + uint32_t ksize; + int64_t increment; + const int max = 65536; + + /* Request format: + * 4 ksize + * ksize key + * 8 increment (big endian int64_t) + */ + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + + /* Sanity check on sizes: + * - ksize + 8 must be < req->psize + * - ksize + 8 must be < 2^16 = 64k + */ + if ( (req->psize < ksize + 8) || ((ksize + 8) > max)) { + stats.net_broken_req++; + req->reply_err(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t); + increment = ntohll( * (int64_t *) (key + ksize) ); + + cres = cache_incr(cache_table, key, ksize, increment); + if (cres == -3) { + req->reply_err(req, ERR_MEM); + return; + } else if (cres == -2) { + /* the value was not NULL terminated */ + req->mini_reply(req, REP_NOMATCH); + return; + } + + if (impact_db) { + struct queue_entry *e; + + /* at this point, the cache_incr() was either successful or a + * miss, but we don't really care */ + + e = make_queue_entry(req, REQ_INCR, key, ksize, + (unsigned char *) &increment, + sizeof(increment)); + if (e == NULL) { + req->reply_err(req, ERR_MEM); + return; + } + queue_lock(op_queue); + queue_put(op_queue, e); + queue_unlock(op_queue); + + queue_signal(op_queue); + } else { + if (cres == -1) + req->mini_reply(req, REP_NOTIN); + else + req->mini_reply(req, REP_OK); + } + + return; +} + +