git » nmdb » commit bc5e243

Implement the increment operation, server side.

author Alberto Bertogli
2007-08-28 06:13:12 UTC
committer Alberto Bertogli
2007-08-28 06:13:12 UTC
parent 0e2f733d3ae2c3d79fac26e7aacdfa282caec97b

Implement the increment operation, server side.

Signed-off-by: Alberto Bertogli <albertito@gmail.com>

nmdb/cache.c +60 -1
nmdb/cache.h +3 -0
nmdb/dbloop.c +47 -0
nmdb/net-const.h +2 -0
nmdb/parse.c +99 -0

diff --git a/nmdb/cache.c b/nmdb/cache.c
index 5d7a88b..9d1b898 100644
--- a/nmdb/cache.c
+++ b/nmdb/cache.c
@@ -6,11 +6,13 @@
  */
 
 #include <sys/types.h>		/* for size_t */
-#include <stdint.h>		/* for uint32_t */
+#include <stdint.h>		/* for [u]int*_t */
 #include <stdlib.h>		/* for malloc() */
 #include <string.h>		/* for memcpy()/memcmp() */
+#include <stdio.h>		/* snprintf() */
 #include "cache.h"
 
+
 struct cache *cache_create(size_t numobjs, unsigned int flags)
 {
 	size_t i;
@@ -352,3 +354,60 @@ exit:
 	return rv;
 }
 
+
+/* Increment the value associated with the given key by the given increment.
+ * The increment is a signed 64 bit value, and the value size must be >= 8
+ * bytes.
+ * Returns:
+ *    1 if the increment succeeded.
+ *   -1 if the value was not in the cache.
+ *   -2 if the value was not null terminated.
+ *   -3 if there was a memory error.
+ */
+int cache_incr(struct cache *cd, const unsigned char *key, size_t ksize,
+		int64_t increment)
+{
+	uint32_t h = 0;
+	unsigned char *val, *newval;
+	int64_t intval;
+	size_t vsize;
+	struct cache_chain *c;
+	struct cache_entry *e;
+
+	h = hash(key, ksize) % cd->hashlen;
+	c = cd->table + h;
+
+	e = find_in_chain(c, key, ksize);
+
+	if (e == NULL)
+		return -3;
+
+	val = e->val;
+	vsize = e->vsize;
+
+	/* the value must be a NULL terminated string, otherwise strtoll might
+	 * cause a segmentation fault */
+	if (val && val[vsize - 1] != '\0')
+		return -2;
+
+	intval = strtoll((char *) val, NULL, 10);
+	intval = intval + increment;
+
+	/* The max value for an unsigned long long is 18446744073709551615,
+	 * and strlen('18446744073709551615') = 20, so if the value is smaller
+	 * than 24 (just in case) we create a new buffer. */
+	if (vsize < 24) {
+		newval = malloc(24);
+		if (newval == NULL)
+			return -3;
+		free(val);
+		e->val = val = newval;
+		e->vsize = vsize = 24;
+	}
+
+	snprintf((char *) val, vsize, "%23lld", (long long int) intval);
+
+	return 1;
+}
+
+
diff --git a/nmdb/cache.h b/nmdb/cache.h
index 66c705d..c4d3d7f 100644
--- a/nmdb/cache.h
+++ b/nmdb/cache.h
@@ -5,6 +5,7 @@
 /* Generic cache layer. See cache.c for more information. */
 
 #include <sys/types.h>		/* for size_t */
+#include <stdint.h>		/* for int64_t */
 
 
 struct cache {
@@ -47,6 +48,8 @@ int cache_del(struct cache *cd, const unsigned char *key, size_t ksize);
 int cache_cas(struct cache *cd, const unsigned char *key, size_t ksize,
 		const unsigned char *oldval, size_t ovsize,
 		const unsigned char *newval, size_t nvsize);
+int cache_incr(struct cache *cd, const unsigned char *key, size_t ksize,
+		int64_t increment);
 
 #endif
 
diff --git a/nmdb/dbloop.c b/nmdb/dbloop.c
index 9a80c0d..11f4f11 100644
--- a/nmdb/dbloop.c
+++ b/nmdb/dbloop.c
@@ -4,6 +4,7 @@
 #include <errno.h>		/* ETIMEDOUT */
 #include <string.h>		/* memcmp() */
 #include <stdlib.h>		/* malloc()/free() */
+#include <stdio.h>		/* snprintf() */
 
 #include "common.h"
 #include "dbloop.h"
@@ -173,6 +174,52 @@ static void process_op(db_t *db, struct queue_entry *e)
 		e->req->reply_cas(e->req, REP_NOMATCH);
 		free(dbval);
 
+	} else if (e->operation == REQ_INCR) {
+		unsigned char *dbval;
+		size_t dbvsize = 64 * 1024;
+		int64_t intval;
+
+		dbval = malloc(dbvsize);
+		if (dbval == NULL) {
+			e->req->reply_err(e->req, ERR_MEM);
+			return;
+		}
+		rv = db_get(db, e->key, e->ksize, dbval, &dbvsize);
+		if (rv == 0) {
+			e->req->mini_reply(e->req, REP_NOTIN);
+			free(dbval);
+			return;
+		}
+
+		/* val must be NULL terminated; see cache_incr() */
+		if (dbval && dbval[dbvsize - 1] != '\0') {
+			e->req->mini_reply(e->req, REP_NOMATCH);
+			free(dbval);
+			return;
+		}
+
+		intval = strtoll((char *) dbval, NULL, 10);
+		intval = intval + * (int64_t *) e->val;
+
+		if (dbvsize < 24) {
+			/* We know dbval is long enough because we've
+			 * allocated it, so we only change dbvsize */
+			dbvsize = 24;
+		}
+
+		snprintf((char *) dbval, dbvsize, "%23lld",
+				(long long int) intval);
+
+		rv = db_set(db, e->key, e->ksize, dbval, dbvsize);
+		if (!rv) {
+			e->req->reply_err(e->req, ERR_DB);
+			return;
+		}
+
+		e->req->mini_reply(e->req, REP_OK);
+
+		free(dbval);
+
 	} else {
 		wlog("Unknown op 0x%x\n", e->operation);
 	}
diff --git a/nmdb/net-const.h b/nmdb/net-const.h
index 79d9553..b9e875a 100644
--- a/nmdb/net-const.h
+++ b/nmdb/net-const.h
@@ -37,6 +37,8 @@
 #define REQ_DEL_ASYNC		0x108
 #define REQ_CACHE_CAS		0x109
 #define REQ_CAS			0x110
+#define REQ_CACHE_INCR		0x111
+#define REQ_INCR		0x112
 
 /* Network replies (different namespace from requests) */
 #define REP_ERR			0x800
diff --git a/nmdb/parse.c b/nmdb/parse.c
index 1acd07f..2fad1e1 100644
--- a/nmdb/parse.c
+++ b/nmdb/parse.c
@@ -16,6 +16,7 @@ static void parse_get(struct req_info *req, int impact_db);
 static void parse_set(struct req_info *req, int impact_db, int async);
 static void parse_del(struct req_info *req, int impact_db, int async);
 static void parse_cas(struct req_info *req, int impact_db);
+static void parse_incr(struct req_info *req, int impact_db);
 
 
 /* Create a queue entry structure based on the parameters passed. Memory
@@ -178,6 +179,10 @@ int parse_message(struct req_info *req,
 		parse_cas(req, 0);
 	else if (cmd == REQ_CAS)
 		parse_cas(req, 1);
+	else if (cmd == REQ_CACHE_INCR)
+		parse_incr(req, 0);
+	else if (cmd == REQ_INCR)
+		parse_incr(req, 1);
 	else {
 		stats.net_unk_req++;
 		req->reply_err(req, ERR_UNKREQ);
@@ -437,3 +442,97 @@ static void parse_cas(struct req_info *req, int impact_db)
 }
 
 
+/* ntohll() is not standard, so we define it using an UGLY trick because there
+ * is no standard way to check for endianness at runtime! */
+static uint64_t ntohll(uint64_t x)
+{
+	static int endianness = 0;
+
+	/* determine the endianness by checking how htonl() behaves; use -1
+	 * for little endian and 1 for big endian */
+	if (endianness == 0) {
+		if (htonl(1) == 1)
+			endianness = 1;
+		else
+			endianness = -1;
+	}
+
+	if (endianness == 1) {
+		/* big endian */
+		return x;
+	}
+
+	/* little endian */
+	return ( ntohl( (x >> 32) & 0xFFFFFFFF ) | \
+			( (uint64_t) ntohl(x & 0xFFFFFFFF) ) << 32 );
+}
+
+
+static void parse_incr(struct req_info *req, int impact_db)
+{
+	int cres;
+	const unsigned char *key;
+	uint32_t ksize;
+	int64_t increment;
+	const int max = 65536;
+
+	/* Request format:
+	 * 4		ksize
+	 * ksize	key
+	 * 8		increment (big endian int64_t)
+	 */
+	ksize = * (uint32_t *) req->payload;
+	ksize = ntohl(ksize);
+
+	/* Sanity check on sizes:
+	 * - ksize + 8 must be < req->psize
+	 * - ksize + 8 must be < 2^16 = 64k
+	 */
+	if ( (req->psize < ksize + 8) || ((ksize + 8) > max)) {
+		stats.net_broken_req++;
+		req->reply_err(req, ERR_BROKEN);
+		return;
+	}
+
+	key = req->payload + sizeof(uint32_t);
+	increment = ntohll( * (int64_t *) (key + ksize) );
+
+	cres = cache_incr(cache_table, key, ksize, increment);
+	if (cres == -3) {
+		req->reply_err(req, ERR_MEM);
+		return;
+	} else if (cres == -2) {
+		/* the value was not NULL terminated */
+		req->mini_reply(req, REP_NOMATCH);
+		return;
+	}
+
+	if (impact_db) {
+		struct queue_entry *e;
+
+		/* at this point, the cache_incr() was either successful or a
+		 * miss, but we don't really care */
+
+		e = make_queue_entry(req, REQ_INCR, key, ksize,
+				(unsigned char *) &increment,
+				sizeof(increment));
+		if (e == NULL) {
+			req->reply_err(req, ERR_MEM);
+			return;
+		}
+		queue_lock(op_queue);
+		queue_put(op_queue, e);
+		queue_unlock(op_queue);
+
+		queue_signal(op_queue);
+	} else {
+		if (cres == -1)
+			req->mini_reply(req, REP_NOTIN);
+		else
+			req->mini_reply(req, REP_OK);
+	}
+
+	return;
+}
+
+