git » nmdb » commit 2c23a7b

Implement CAS in nmdb.

author Alberto Bertogli
2007-04-29 06:37:09 UTC
committer Alberto Bertogli
2007-04-29 06:37:09 UTC
parent 0dea87c6521681c2e2eaf265ef631304ebca4fea

Implement CAS in nmdb.

This patch implements the compare-and-swap operation in the server.
The following patches will extend the C library and its bindings to
support it, and add proper documentation.

nmdb/cache.c +49 -0
nmdb/cache.h +3 -0
nmdb/db.c +35 -0
nmdb/net-const.h +4 -0
nmdb/queue.c +4 -0
nmdb/queue.h +2 -0
nmdb/tipc.c +119 -4
nmdb/tipc.h +1 -0

diff --git a/nmdb/cache.c b/nmdb/cache.c
index 27f426f..59538b3 100644
--- a/nmdb/cache.c
+++ b/nmdb/cache.c
@@ -307,3 +307,52 @@ exit:
 	return rv;
 }
 
+
+/* Performs a cache compare-and-swap.
+ * Returns -2 if there was an error, -1 if the key is not in the cache, 0 if
+ * the old value does not match, and 1 if the CAS was successful. */
+int cache_cas(struct cache *cd, const unsigned char *key, size_t ksize,
+		const unsigned char *oldval, size_t ovsize,
+		const unsigned char *newval, size_t nvsize)
+{
+	int rv = 1;
+	uint32_t h = 0;
+	struct cache_chain *c;
+	struct cache_entry *e;
+	unsigned char *buf;
+
+	h = hash(key, ksize) % cd->hashlen;
+	c = cd->table + h;
+
+	e = find_in_chain(c, key, ksize);
+
+	if (e == NULL) {
+		rv = -1;
+		goto exit;
+	}
+
+	if (e->vsize != ovsize) {
+		rv = 0;
+		goto exit;
+	}
+
+	if (memcmp(e->val, oldval, ovsize) != 0) {
+		rv = 0;
+		goto exit;
+	}
+
+	buf = malloc(nvsize);
+	if (buf == NULL) {
+		rv = -2;
+		goto exit;
+	}
+
+	memcpy(buf, newval, nvsize);
+	free(e->val);
+	e->val = buf;
+	e->vsize = nvsize;
+
+exit:
+	return rv;
+}
+
diff --git a/nmdb/cache.h b/nmdb/cache.h
index fd31900..66c705d 100644
--- a/nmdb/cache.h
+++ b/nmdb/cache.h
@@ -44,6 +44,9 @@ int cache_get(struct cache *cd, const unsigned char *key, size_t ksize,
 int cache_set(struct cache *cd, const unsigned char *k, size_t ksize,
 		const unsigned char *v, size_t vsize);
 int cache_del(struct cache *cd, const unsigned char *key, size_t ksize);
+int cache_cas(struct cache *cd, const unsigned char *key, size_t ksize,
+		const unsigned char *oldval, size_t ovsize,
+		const unsigned char *newval, size_t nvsize);
 
 #endif
 
diff --git a/nmdb/db.c b/nmdb/db.c
index 0589bc2..5ebdfac 100644
--- a/nmdb/db.c
+++ b/nmdb/db.c
@@ -3,6 +3,7 @@
 #include <time.h>		/* nanosleep() */
 #include <errno.h>		/* ETIMEDOUT */
 #include <stdio.h>		/* perror() */
+#include <string.h>		/* memcmp() */
 
 #include "common.h"
 #include "db.h"
@@ -136,6 +137,40 @@ static void process_op(db_t *db, struct queue_entry *e)
 	} else if (e->operation == REQ_DEL_ASYNC) {
 		db_del(db, e->key, e->ksize);
 
+	} else if (e->operation == REQ_CAS) {
+		unsigned char *dbval;
+		size_t dbvsize = 64 * 1024;
+
+		/* Compare */
+		dbval = malloc(dbvsize);
+		if (dbval == NULL) {
+			tipc_reply_err(e->req, ERR_MEM);
+			return;
+		}
+		rv = db_get(db, e->key, e->ksize, dbval, &dbvsize);
+		if (rv == 0) {
+			tipc_reply_get(e->req, REP_NOTIN, NULL, 0);
+			free(dbval);
+			return;
+		}
+
+		if (e->vsize == dbvsize &&
+				memcmp(e->val, dbval, dbvsize) == 0) {
+			/* Swap */
+			rv = db_set(db, e->key, e->ksize, e->newval, e->nvsize);
+			if (!rv) {
+				tipc_reply_err(e->req, ERR_DB);
+				return;
+			}
+
+			tipc_reply_cas(e->req, REP_OK);
+			free(dbval);
+			return;
+		}
+
+		tipc_reply_cas(e->req, REP_NOMATCH);
+		free(dbval);
+
 	} else {
 		printf("Unknown op 0x%x\n", e->operation);
 	}
diff --git a/nmdb/net-const.h b/nmdb/net-const.h
index a382490..127b9b9 100644
--- a/nmdb/net-const.h
+++ b/nmdb/net-const.h
@@ -23,6 +23,8 @@
 #define REQ_DEL_SYNC		0x106
 #define REQ_SET_ASYNC		0x107
 #define REQ_DEL_ASYNC		0x108
+#define REQ_CACHE_CAS		0x109
+#define REQ_CAS			0x110
 
 /* Network replies (different namespace from requests) */
 #define REP_ERR			0x800
@@ -30,6 +32,7 @@
 #define REP_CACHE_MISS		0x802
 #define REP_OK			0x803
 #define REP_NOTIN		0x804
+#define REP_NOMATCH		0x805
 
 /* Network error replies */
 #define ERR_VER			0x101	/* Version mismatch */
@@ -39,5 +42,6 @@
 #define ERR_MEM			0x105	/* Memory allocation error */
 #define ERR_DB			0x106	/* Database error */
 
+
 #endif
 
diff --git a/nmdb/queue.c b/nmdb/queue.c
index 3afbde5..eb56b09 100644
--- a/nmdb/queue.c
+++ b/nmdb/queue.c
@@ -77,8 +77,10 @@ struct queue_entry *queue_entry_create(void)
 	e->operation = 0;
 	e->key = NULL;
 	e->val = NULL;
+	e->newval = NULL;
 	e->ksize = 0;
 	e->vsize = 0;
+	e->nvsize = 0;
 	e->prev = NULL;
 
 	return e;
@@ -93,6 +95,8 @@ void queue_entry_free(struct queue_entry *e) {
 		free(e->key);
 	if (e->val)
 		free(e->val);
+	if (e->newval)
+		free(e->newval);
 	free(e);
 	return;
 }
diff --git a/nmdb/queue.h b/nmdb/queue.h
index 69a0002..93e7227 100644
--- a/nmdb/queue.h
+++ b/nmdb/queue.h
@@ -20,8 +20,10 @@ struct queue_entry {
 
 	unsigned char *key;
 	unsigned char *val;
+	unsigned char *newval;
 	size_t ksize;
 	size_t vsize;
+	size_t nvsize;
 
 	struct queue_entry *prev;
 	/* A pointer to the next element on the list is actually not
diff --git a/nmdb/tipc.c b/nmdb/tipc.c
index 3a216bd..f6f6e3c 100644
--- a/nmdb/tipc.c
+++ b/nmdb/tipc.c
@@ -19,7 +19,7 @@ static void parse_msg(struct req_info *req, unsigned char *buf,
 static void parse_get(struct req_info *req, int impact_db);
 static void parse_set(struct req_info *req, int impact_db, int async);
 static void parse_del(struct req_info *req, int impact_db, int async);
-
+static void parse_cas(struct req_info *req, int impact_db);
 
 /*
  * Miscelaneous helper functions
@@ -89,12 +89,13 @@ static void mini_reply(struct req_info *req, uint32_t reply)
 /* Create a queue entry structure based on the parameters passed. Memory
  * allocated here will be free()'d in queue_entry_free(). It's not the
  * cleanest way, but the alternatives are even messier. */
-static struct queue_entry *make_queue_entry(struct req_info *req,
+static struct queue_entry *make_queue_long_entry(struct req_info *req,
 		uint32_t operation, const unsigned char *key, size_t ksize,
-		const unsigned char *val, size_t vsize)
+		const unsigned char *val, size_t vsize,
+		const unsigned char *newval, size_t nvsize)
 {
 	struct queue_entry *e;
-	unsigned char *kcopy, *vcopy;
+	unsigned char *kcopy, *vcopy, *nvcopy;
 
 	e = queue_entry_create();
 	if (e == NULL) {
@@ -123,11 +124,27 @@ static struct queue_entry *make_queue_entry(struct req_info *req,
 		memcpy(vcopy, val, vsize);
 	}
 
+	nvcopy = NULL;
+	if (newval != NULL) {
+		nvcopy = malloc(nvsize);
+		if (nvcopy == NULL) {
+			queue_entry_free(e);
+			if (kcopy != NULL)
+				free(kcopy);
+			if (vcopy != NULL)
+				free(vcopy);
+			return NULL;
+		}
+		memcpy(nvcopy, newval, nvsize);
+	}
+
 	e->operation = operation;
 	e->key = kcopy;
 	e->ksize = ksize;
 	e->val = vcopy;
 	e->vsize = vsize;
+	e->newval = nvcopy;
+	e->nvsize = nvsize;
 
 	/* Create a copy of req, including clisa */
 	e->req = malloc(sizeof(struct req_info));
@@ -151,6 +168,16 @@ static struct queue_entry *make_queue_entry(struct req_info *req,
 	return e;
 }
 
+/* Like make_queue_long_entry() but with few parameters because most actions
+ * do not need newval. */
+static struct queue_entry *make_queue_entry(struct req_info *req,
+		uint32_t operation, const unsigned char *key, size_t ksize,
+		const unsigned char *val, size_t vsize)
+{
+	return make_queue_long_entry(req, operation, key, ksize, val, vsize,
+			NULL, 0);
+}
+
 
 /* The tipc_reply_* functions are used by the db code to send the network
  * replies. */
@@ -208,6 +235,11 @@ void tipc_reply_del(struct req_info *req, uint32_t reply)
 	mini_reply(req, reply);
 }
 
+void tipc_reply_cas(struct req_info *req, uint32_t reply)
+{
+	mini_reply(req, reply);
+}
+
 
 /*
  * Main functions for receiving and parsing
@@ -339,6 +371,10 @@ static void parse_msg(struct req_info *req, unsigned char *buf, size_t bsize)
 		parse_set(req, 1, 1);
 	else if (cmd == REQ_DEL_ASYNC)
 		parse_del(req, 1, 1);
+	else if (cmd == REQ_CACHE_CAS)
+		parse_cas(req, 0);
+	else if (cmd == REQ_CAS)
+		parse_cas(req, 1);
 	else {
 		stats.net_unk_req++;
 		rep_send_error(req, ERR_UNKREQ);
@@ -522,4 +558,83 @@ static void parse_del(struct req_info *req, int impact_db, int async)
 	return;
 }
 
+static void parse_cas(struct req_info *req, int impact_db)
+{
+	int rv;
+	unsigned char *key, *oldval, *newval;
+	uint32_t ksize, ovsize, nvsize;
+	const int max = 65536;
+
+	/* Request format:
+	 * 4		ksize
+	 * 4		ovsize
+	 * 4		nvsize
+	 * ksize	key
+	 * ovsize	oldval
+	 * nvsize	newval
+	 */
+	ksize = * (uint32_t *) req->payload;
+	ksize = ntohl(ksize);
+	ovsize = * ( ((uint32_t *) req->payload) + 1);
+	ovsize = ntohl(ovsize);
+	nvsize = * ( ((uint32_t *) req->payload) + 2);
+	nvsize = ntohl(nvsize);
+
+	/* Sanity check on sizes:
+	 * - ksize, ovsize and nvsize must all be < req->psize
+	 * - ksize, ovsize and nvsize must all be < 2^16 = 64k
+	 * - ksize + ovsize + mvsize < 2^16 = 64k
+	 */
+	if ( (req->psize < ksize) || (req->psize < ovsize) ||
+				(req->psize < nvsize) ||
+			(ksize > max) || (ovsize > max) ||
+				(nvsize > max) ||
+			( (ksize + ovsize + nvsize) > max) ) {
+		stats.net_broken_req++;
+		rep_send_error(req, ERR_BROKEN);
+		return;
+	}
+
+	key = req->payload + sizeof(uint32_t) * 3;
+	oldval = key + ksize;
+	newval = oldval + ovsize;
+
+	rv = cache_cas(cache_table, key, ksize, oldval, ovsize,
+			newval, nvsize);
+	if (rv == 0) {
+		/* If the cache doesn't match, there is no need to bother the
+		 * DB even if we were asked to impact. */
+		mini_reply(req, REP_NOMATCH);
+		return;
+	}
+
+	if (!impact_db) {
+		if (rv == -1) {
+			mini_reply(req, REP_NOTIN);
+			return;
+		} else {
+			mini_reply(req, REP_OK);
+			return;
+		}
+	} else {
+		/* impact_db = 1 and the key is either not in the cache, or
+		 * cache_cas() was successful. We now need to queue the CAS in
+		 * the database. */
+		struct queue_entry *e;
+
+		e = make_queue_long_entry(req, REQ_CAS, key, ksize,
+				oldval, ovsize, newval, nvsize);
+		if (e == NULL) {
+			rep_send_error(req, ERR_MEM);
+			return;
+		}
+
+		queue_lock(op_queue);
+		queue_put(op_queue, e);
+		queue_unlock(op_queue);
+		queue_signal(op_queue);
+	}
+	return;
+}
+
 
diff --git a/nmdb/tipc.h b/nmdb/tipc.h
index a717836..a743000 100644
--- a/nmdb/tipc.h
+++ b/nmdb/tipc.h
@@ -29,6 +29,7 @@ void tipc_reply_get(struct req_info *req, uint32_t reply,
 		unsigned char *val, size_t vsize);
 void tipc_reply_set(struct req_info *req, uint32_t reply);
 void tipc_reply_del(struct req_info *req, uint32_t reply);
+void tipc_reply_cas(struct req_info *req, uint32_t reply);
 
 #endif