git » nmdb » commit 4641cc6

Isolate message parsing from network message receiving.

author Alberto Bertogli
2007-06-01 05:49:32 UTC
committer Alberto Bertogli
2007-06-01 05:49:32 UTC
parent 67f42aa3cda28b810f23cdbd55e990300634254c

Isolate message parsing from network message receiving.

This patch cleanly splits message parsing from message receiving.
While on TIPC the message received IS the one to parse, on TCP we have
a thin but troublesome message wrapper that we don't want to mix with
our real code.

It also simplifies the code a lot, and makes it easier to add new
protocols in the future.


Signed-off-by: Alberto Bertogli <albertito@gmail.com>

nmdb/Makefile +1 -1
nmdb/req.h +2 -1
nmdb/tcp.c +26 -430
nmdb/tipc.c +14 -433

diff --git a/nmdb/Makefile b/nmdb/Makefile
index 6b26d6f..a37dac3 100644
--- a/nmdb/Makefile
+++ b/nmdb/Makefile
@@ -18,7 +18,7 @@ endif
 PREFIX=/usr/local
 
 
-OBJS = be-qdbm.o cache.o db.o queue.o net.o tcp.o tipc.o main.o
+OBJS = be-qdbm.o cache.o db.o queue.o net.o parse.o tcp.o tipc.o main.o
 
 default: all
 
diff --git a/nmdb/req.h b/nmdb/req.h
index f8c7810..4c52d59 100644
--- a/nmdb/req.h
+++ b/nmdb/req.h
@@ -23,10 +23,11 @@ struct req_info {
 	/* operation information */
 	uint32_t id;
 	uint32_t cmd;
-	unsigned char *payload;
+	const unsigned char *payload;
 	size_t psize;
 
 	/* operations */
+	void (*mini_reply)(struct req_info *req, uint32_t reply);
 	void (*reply_err)(struct req_info *req, uint32_t reply);
 	void (*reply_get)(struct req_info *req, uint32_t reply,
 			unsigned char *val, size_t vsize);
diff --git a/nmdb/tcp.c b/nmdb/tcp.c
index 622611a..cde51f1 100644
--- a/nmdb/tcp.c
+++ b/nmdb/tcp.c
@@ -6,7 +6,6 @@
 #include <stdint.h>		/* uint32_t and friends */
 #include <arpa/inet.h>		/* htonls() and friends */
 #include <netinet/in.h>		/* INET stuff */
-//#include <netinet/ip.h>		/* IP stuff */
 #include <netinet/tcp.h>	/* TCP stuff */
 #include <string.h>		/* memcpy() */
 #include <unistd.h>		/* fcntl() */
@@ -21,9 +20,10 @@ typedef unsigned char u_char;
 
 #include "tcp.h"
 #include "common.h"
-#include "queue.h"
 #include "net-const.h"
 #include "req.h"
+#include "parse.h"
+
 
 #define printf(...) do { } while (0)
 
@@ -42,19 +42,16 @@ struct tcp_socket {
 };
 
 static void tcp_recv(int fd, short event, void *arg);
-static void parse_msg(struct tcp_socket *tcpsock, struct req_info *req,
+static void process_buf(struct tcp_socket *tcpsock, struct req_info *req,
 		unsigned char *buf, size_t len);
-static void parse_get(struct req_info *req, int impact_db);
-static void parse_set(struct req_info *req, int impact_db, int async);
-static void parse_del(struct req_info *req, int impact_db, int async);
-static void parse_cas(struct req_info *req, int impact_db);
 
-void tcp_reply_err(struct req_info *req, uint32_t reply);
-void tcp_reply_get(struct req_info *req, uint32_t reply,
+static void tcp_mini_reply(struct req_info *req, uint32_t reply);
+static void tcp_reply_err(struct req_info *req, uint32_t reply);
+static void tcp_reply_get(struct req_info *req, uint32_t reply,
 		unsigned char *val, size_t vsize);
-void tcp_reply_set(struct req_info *req, uint32_t reply);
-void tcp_reply_del(struct req_info *req, uint32_t reply);
-void tcp_reply_cas(struct req_info *req, uint32_t reply);
+static void tcp_reply_set(struct req_info *req, uint32_t reply);
+static void tcp_reply_del(struct req_info *req, uint32_t reply);
+static void tcp_reply_cas(struct req_info *req, uint32_t reply);
 
 
 /*
@@ -111,7 +108,7 @@ static int rep_send(const struct req_info *req, const unsigned char *buf,
 
 
 /* Send small replies, consisting in only a value. */
-static void mini_reply(struct req_info *req, uint32_t reply)
+void tcp_mini_reply(struct req_info *req, uint32_t reply)
 {
 	/* We use a mini buffer to speedup the small replies, to avoid the
 	 * malloc() overhead. */
@@ -128,99 +125,6 @@ static void mini_reply(struct req_info *req, uint32_t reply)
 }
 
 
-/* Create a queue entry structure based on the parameters passed. Memory
- * allocated here will be free()'d in queue_entry_free(). It's not the
- * cleanest way, but the alternatives are even messier. */
-static struct queue_entry *make_queue_long_entry(struct req_info *req,
-		uint32_t operation, const unsigned char *key, size_t ksize,
-		const unsigned char *val, size_t vsize,
-		const unsigned char *newval, size_t nvsize)
-{
-	struct queue_entry *e;
-	unsigned char *kcopy, *vcopy, *nvcopy;
-
-	e = queue_entry_create();
-	if (e == NULL) {
-		return NULL;
-	}
-
-	kcopy = NULL;
-	if (key != NULL) {
-		kcopy = malloc(ksize);
-		if (kcopy == NULL) {
-			queue_entry_free(e);
-			return NULL;
-		}
-		memcpy(kcopy, key, ksize);
-	}
-
-	vcopy = NULL;
-	if (val != NULL) {
-		vcopy = malloc(vsize);
-		if (vcopy == NULL) {
-			queue_entry_free(e);
-			if (kcopy != NULL)
-				free(kcopy);
-			return NULL;
-		}
-		memcpy(vcopy, val, vsize);
-	}
-
-	nvcopy = NULL;
-	if (newval != NULL) {
-		nvcopy = malloc(nvsize);
-		if (nvcopy == NULL) {
-			queue_entry_free(e);
-			if (kcopy != NULL)
-				free(kcopy);
-			if (vcopy != NULL)
-				free(vcopy);
-			return NULL;
-		}
-		memcpy(nvcopy, newval, nvsize);
-	}
-
-	e->operation = operation;
-	e->key = kcopy;
-	e->ksize = ksize;
-	e->val = vcopy;
-	e->vsize = vsize;
-	e->newval = nvcopy;
-	e->nvsize = nvsize;
-
-	/* Create a copy of req, including clisa */
-	e->req = malloc(sizeof(struct req_info));
-	if (e->req == NULL) {
-		queue_entry_free(e);
-		return NULL;
-	}
-	memcpy(e->req, req, sizeof(struct req_info));
-
-	e->req->clisa = malloc(sizeof(struct sockaddr_in));
-	if (e->req->clisa == NULL) {
-		queue_entry_free(e);
-		return NULL;
-	}
-	memcpy(e->req->clisa, req->clisa, sizeof(struct sockaddr_in));
-
-	/* clear out unused fields */
-	e->req->payload = NULL;
-	e->req->psize = 0;
-
-	return e;
-}
-
-/* Like make_queue_long_entry() but with few parameters because most actions
- * do not need newval. */
-static struct queue_entry *make_queue_entry(struct req_info *req,
-		uint32_t operation, const unsigned char *key, size_t ksize,
-		const unsigned char *val, size_t vsize)
-{
-	return make_queue_long_entry(req, operation, key, ksize, val, vsize,
-			NULL, 0);
-}
-
-
 void tcp_reply_err(struct req_info *req, uint32_t reply)
 {
 	rep_send_error(req, reply);
@@ -231,7 +135,7 @@ void tcp_reply_get(struct req_info *req, uint32_t reply,
 {
 	if (val == NULL) {
 		/* miss */
-		mini_reply(req, reply);
+		tcp_mini_reply(req, reply);
 	} else {
 		unsigned char *buf;
 		size_t bsize;
@@ -269,18 +173,18 @@ void tcp_reply_get(struct req_info *req, uint32_t reply,
 
 void tcp_reply_set(struct req_info *req, uint32_t reply)
 {
-	mini_reply(req, reply);
+	tcp_mini_reply(req, reply);
 }
 
 
 void tcp_reply_del(struct req_info *req, uint32_t reply)
 {
-	mini_reply(req, reply);
+	tcp_mini_reply(req, reply);
 }
 
 void tcp_reply_cas(struct req_info *req, uint32_t reply)
 {
-	mini_reply(req, reply);
+	tcp_mini_reply(req, reply);
 }
 
 
@@ -411,7 +315,7 @@ static void tcp_recv(int fd, short event, void *arg)
 		/* New incoming message */
 
 		/* Allocate a little bit more over the max. message size,
-		 * which is 64k; it will be freed by parse_msg(). */
+		 * which is 64k; it will be freed by process_buf(). */
 		bsize = 68 * 1024;
 		buf = malloc(bsize);
 		if (buf == NULL) {
@@ -430,7 +334,7 @@ static void tcp_recv(int fd, short event, void *arg)
 			goto error_exit;
 		}
 
-		/* parse_msg() will take care of freeing this when the time
+		/* process_buf() will take care of freeing this when the time
 		 * comes */
 		req = malloc(sizeof(struct req_info));
 		if (req == NULL) {
@@ -442,13 +346,14 @@ static void tcp_recv(int fd, short event, void *arg)
 		req->type = REQTYPE_TCP;
 		req->clisa = (struct sockaddr *) &tcpsock->clisa;
 		req->clilen = tcpsock->clilen;
+		req->mini_reply = tcp_mini_reply;
 		req->reply_err = tcp_reply_err;
 		req->reply_get = tcp_reply_get;
 		req->reply_set = tcp_reply_set;
 		req->reply_del = tcp_reply_del;
 		req->reply_cas = tcp_reply_cas;
 
-		parse_msg(tcpsock, req, buf, rv);
+		process_buf(tcpsock, req, buf, rv);
 
 	} else {
 		/* We already got a partial message, complete it. */
@@ -465,7 +370,7 @@ static void tcp_recv(int fd, short event, void *arg)
 
 		tcpsock->len += rv;
 
-		parse_msg(tcpsock, tcpsock->req, tcpsock->buf, tcpsock->len);
+		process_buf(tcpsock, tcpsock->req, tcpsock->buf, tcpsock->len);
 	}
 
 	return;
@@ -478,14 +383,11 @@ error_exit:
 }
 
 
-/* Main message parsing and dissecting */
-static void parse_msg(struct tcp_socket *tcpsock, struct req_info *req,
+/* Main message unwrapping */
+static void process_buf(struct tcp_socket *tcpsock, struct req_info *req,
 		unsigned char *buf, size_t len)
 {
-	uint32_t hdr, id, cmd, totaltoget = 0;
-	uint8_t ver;
-	size_t psize;
-	unsigned char *payload;
+	uint32_t totaltoget = 0;
 
 	printf("parse l:%tu tl:%tu tb:%p ts:%tu \n", len, tcpsock->len, tcpsock->buf, tcpsock->pktsize);
 
@@ -521,74 +423,19 @@ static void parse_msg(struct tcp_socket *tcpsock, struct req_info *req,
 			tcpsock->len = len;
 			tcpsock->pktsize = totaltoget;
 		}
-
 		return;
 	}
+
 	printf("parsing\n");
 
 	/* The buffer is complete, parse it as usual. */
 
-	/* The header is:
-	 * 4 bytes	Total message length
-	 * 4 bytes	Version + ID
-	 * 4 bytes	Command
-	 * Variable 	Payload
-	 */
-
-	hdr = * ((uint32_t *) buf + 1);
-	hdr = htonl(hdr);
-
-	/* FIXME: little endian-only */
-	ver = (hdr & 0xF0000000) >> 28;
-	id = hdr & 0x0FFFFFFF;
-	req->id = id;
-
-	cmd = ntohl(* ((uint32_t *) buf + 2));
-
-	if (ver != PROTO_VER) {
-		stats.net_version_mismatch++;
-		rep_send_error(req, ERR_VER);
+	if (parse_message(req, buf + 4, len - 4)) {
 		goto exit;
+	} else {
+		goto error_exit;
 	}
 
-	/* We define payload as the stuff after buf. But be careful because
-	 * there might be none (if len == 1). Doing the pointer arithmetic
-	 * isn't problematic, but accessing the payload should be done only if
-	 * we know we have enough data. */
-	payload = buf + 12;
-	psize = len - 12;
-
-	/* Store the id encoded in network byte order, so that we don't have
-	 * to calculate it at send time. */
-	req->id = htonl(id);
-	req->cmd = cmd;
-	req->payload = payload;
-	req->psize = psize;
-
-	if (cmd == REQ_CACHE_GET)
-		parse_get(req, 0);
-	else if (cmd == REQ_CACHE_SET)
-		parse_set(req, 0, 0);
-	else if (cmd == REQ_CACHE_DEL)
-		parse_del(req, 0, 0);
-	else if (cmd == REQ_GET)
-		parse_get(req, 1);
-	else if (cmd == REQ_SET_SYNC)
-		parse_set(req, 1, 0);
-	else if (cmd == REQ_DEL_SYNC)
-		parse_del(req, 1, 0);
-	else if (cmd == REQ_SET_ASYNC)
-		parse_set(req, 1, 1);
-	else if (cmd == REQ_DEL_ASYNC)
-		parse_del(req, 1, 1);
-	else if (cmd == REQ_CACHE_CAS)
-		parse_cas(req, 0);
-	else if (cmd == REQ_CAS)
-		parse_cas(req, 1);
-	else {
-		stats.net_unk_req++;
-		rep_send_error(req, ERR_UNKREQ);
-	}
 
 exit:
 	printf("pm exit\n");
@@ -607,7 +454,6 @@ exit:
 
 	return;
 
-
 error_exit:
 	printf("pm error\n");
 		printf("t:%p b:%p\n", tcpsock->buf, buf);
@@ -630,253 +476,3 @@ error_exit:
 }
 
 
-static void parse_get(struct req_info *req, int impact_db)
-{
-	int hit;
-	unsigned char *key;
-	uint32_t ksize;
-	unsigned char *val = NULL;
-	size_t vsize = 0;
-
-	ksize = * (uint32_t *) req->payload;
-	ksize = ntohl(ksize);
-	if (req->psize < ksize) {
-		stats.net_broken_req++;
-		rep_send_error(req, ERR_BROKEN);
-		return;
-	}
-
-	key = req->payload + sizeof(uint32_t);
-
-	hit = cache_get(cache_table, key, ksize, &val, &vsize);
-
-	if (!hit && !impact_db) {
-		mini_reply(req, REP_CACHE_MISS);
-		return;
-	} else if (!hit && impact_db) {
-		struct queue_entry *e;
-		e = make_queue_entry(req, REQ_GET, key, ksize, NULL, 0);
-		if (e == NULL) {
-			rep_send_error(req, ERR_MEM);
-			return;
-		}
-		queue_lock(op_queue);
-		queue_put(op_queue, e);
-		queue_unlock(op_queue);
-		queue_signal(op_queue);
-		return;
-	} else {
-		tcp_reply_get(req, REP_CACHE_HIT, val, vsize);
-		return;
-	}
-}
-
-
-static void parse_set(struct req_info *req, int impact_db, int async)
-{
-	int rv;
-	unsigned char *key, *val;
-	uint32_t ksize, vsize;
-	const int max = 65536;
-
-	/* Request format:
-	 * 4		ksize
-	 * 4		vsize
-	 * ksize	key
-	 * vsize	val
-	 */
-	ksize = * (uint32_t *) req->payload;
-	ksize = ntohl(ksize);
-	vsize = * ( ((uint32_t *) req->payload) + 1),
-	vsize = ntohl(vsize);
-
-	/* Sanity check on sizes:
-	 * - ksize and vsize must both be < req->psize
-	 * - ksize and vsize must both be < 2^16 = 64k
-	 * - ksize + vsize < 2^16 = 64k
-	 */
-	if ( (req->psize < ksize) || (req->psize < vsize) ||
-			(ksize > max) || (vsize > max) ||
-			( (ksize + vsize) > max) ) {
-		stats.net_broken_req++;
-		rep_send_error(req, ERR_BROKEN);
-		return;
-	}
-
-	key = req->payload + sizeof(uint32_t) * 2;
-	val = key + ksize;
-
-	rv = cache_set(cache_table, key, ksize, val, vsize);
-	if (!rv) {
-		rep_send_error(req, ERR_MEM);
-		return;
-	}
-
-	if (impact_db) {
-		struct queue_entry *e;
-		uint32_t request;
-
-		request = REQ_SET_SYNC;
-		if (async)
-			request = REQ_SET_ASYNC;
-
-		e = make_queue_entry(req, request, key, ksize, val, vsize);
-		if (e == NULL) {
-			rep_send_error(req, ERR_MEM);
-			return;
-		}
-		queue_lock(op_queue);
-		queue_put(op_queue, e);
-		queue_unlock(op_queue);
-
-		if (async) {
-			mini_reply(req, REP_OK);
-		} else {
-			/* Signal the DB thread it has work only if it's a
-			 * synchronous operation, asynchronous don't mind
-			 * waiting. It does have a measurable impact on
-			 * performance (2083847usec vs 2804973usec for sets on
-			 * "test2d 100000 10 10"). */
-			queue_signal(op_queue);
-		}
-		return;
-	} else {
-		mini_reply(req, REP_OK);
-	}
-
-	return;
-}
-
-
-static void parse_del(struct req_info *req, int impact_db, int async)
-{
-	int hit;
-	unsigned char *key;
-	uint32_t ksize;
-
-	ksize = * (uint32_t *) req->payload;
-	ksize = ntohl(ksize);
-	if (req->psize < ksize) {
-		stats.net_broken_req++;
-		rep_send_error(req, ERR_BROKEN);
-		return;
-	}
-
-	key = req->payload + sizeof(uint32_t);
-
-	hit = cache_del(cache_table, key, ksize);
-
-	if (!impact_db && hit) {
-		mini_reply(req, REP_OK);
-	} else if (!impact_db && !hit) {
-		mini_reply(req, REP_NOTIN);
-	} else if (impact_db) {
-		struct queue_entry *e;
-		uint32_t request;
-
-		request = REQ_DEL_SYNC;
-		if (async)
-			request = REQ_DEL_ASYNC;
-
-		e = make_queue_entry(req, request, key, ksize, NULL, 0);
-		if (e == NULL) {
-			rep_send_error(req, ERR_MEM);
-			return;
-		}
-		queue_lock(op_queue);
-		queue_put(op_queue, e);
-		queue_unlock(op_queue);
-
-		if (async) {
-			mini_reply(req, REP_OK);
-		} else {
-			/* See comment on parse_set(). */
-			queue_signal(op_queue);
-		}
-
-		return;
-	}
-
-	return;
-}
-
-static void parse_cas(struct req_info *req, int impact_db)
-{
-	int rv;
-	unsigned char *key, *oldval, *newval;
-	uint32_t ksize, ovsize, nvsize;
-	const int max = 65536;
-
-	/* Request format:
-	 * 4		ksize
-	 * 4		ovsize
-	 * 4		nvsize
-	 * ksize	key
-	 * ovsize	oldval
-	 * nvsize	newval
-	 */
-	ksize = * (uint32_t *) req->payload;
-	ksize = ntohl(ksize);
-	ovsize = * ( ((uint32_t *) req->payload) + 1);
-	ovsize = ntohl(ovsize);
-	nvsize = * ( ((uint32_t *) req->payload) + 2);
-	nvsize = ntohl(nvsize);
-
-	/* Sanity check on sizes:
-	 * - ksize, ovsize and nvsize must all be < req->psize
-	 * - ksize, ovsize and nvsize must all be < 2^16 = 64k
-	 * - ksize + ovsize + mvsize < 2^16 = 64k
-	 */
-	if ( (req->psize < ksize) || (req->psize < ovsize) ||
-				(req->psize < nvsize) ||
-			(ksize > max) || (ovsize > max) ||
-				(nvsize > max) ||
-			( (ksize + ovsize + nvsize) > max) ) {
-		stats.net_broken_req++;
-		rep_send_error(req, ERR_BROKEN);
-		return;
-	}
-
-	key = req->payload + sizeof(uint32_t) * 3;
-	oldval = key + ksize;
-	newval = oldval + ovsize;
-
-	rv = cache_cas(cache_table, key, ksize, oldval, ovsize,
-			newval, nvsize);
-	if (rv == 0) {
-		/* If the cache doesn't match, there is no need to bother the
-		 * DB even if we were asked to impact. */
-		mini_reply(req, REP_NOMATCH);
-		return;
-	}
-
-	if (!impact_db) {
-		if (rv == -1) {
-			mini_reply(req, REP_NOTIN);
-			return;
-		} else {
-			mini_reply(req, REP_OK);
-			return;
-		}
-	} else {
-		/* impact_db = 1 and the key is either not in the cache, or
-		 * cache_cas() was successful. We now need to queue the CAS in
-		 * the database. */
-		struct queue_entry *e;
-
-		e = make_queue_long_entry(req, REQ_CAS, key, ksize,
-				oldval, ovsize, newval, nvsize);
-		if (e == NULL) {
-			rep_send_error(req, ERR_MEM);
-			return;
-		}
-
-		queue_lock(op_queue);
-		queue_put(op_queue, e);
-		queue_unlock(op_queue);
-		queue_signal(op_queue);
-	}
-	return;
-}
-
-
diff --git a/nmdb/tipc.c b/nmdb/tipc.c
index d6a6fb0..c1b0059 100644
--- a/nmdb/tipc.c
+++ b/nmdb/tipc.c
@@ -11,24 +11,18 @@
 
 #include "tipc.h"
 #include "common.h"
-#include "queue.h"
 #include "net-const.h"
 #include "req.h"
+#include "parse.h"
 
 
-static void parse_msg(struct req_info *req, unsigned char *buf,
-		size_t bsize);
-static void parse_get(struct req_info *req, int impact_db);
-static void parse_set(struct req_info *req, int impact_db, int async);
-static void parse_del(struct req_info *req, int impact_db, int async);
-static void parse_cas(struct req_info *req, int impact_db);
-
-void tipc_reply_err(struct req_info *req, uint32_t reply);
-void tipc_reply_get(struct req_info *req, uint32_t reply,
+static void tipc_mini_reply(struct req_info *req, uint32_t reply);
+static void tipc_reply_err(struct req_info *req, uint32_t reply);
+static void tipc_reply_get(struct req_info *req, uint32_t reply,
 		unsigned char *val, size_t vsize);
-void tipc_reply_set(struct req_info *req, uint32_t reply);
-void tipc_reply_del(struct req_info *req, uint32_t reply);
-void tipc_reply_cas(struct req_info *req, uint32_t reply);
+static void tipc_reply_set(struct req_info *req, uint32_t reply);
+static void tipc_reply_del(struct req_info *req, uint32_t reply);
+static void tipc_reply_cas(struct req_info *req, uint32_t reply);
 
 
 /*
@@ -77,7 +71,7 @@ static int rep_send(const struct req_info *req, const unsigned char *buf,
 
 
 /* Send small replies, consisting in only a value. */
-static void mini_reply(struct req_info *req, uint32_t reply)
+static void tipc_mini_reply(struct req_info *req, uint32_t reply)
 {
 	/* We use a mini buffer to speedup the small replies, to avoid the
 	 * malloc() overhead. */
@@ -94,99 +88,6 @@ static void mini_reply(struct req_info *req, uint32_t reply)
 }
 
 
-/* Create a queue entry structure based on the parameters passed. Memory
- * allocated here will be free()'d in queue_entry_free(). It's not the
- * cleanest way, but the alternatives are even messier. */
-static struct queue_entry *make_queue_long_entry(struct req_info *req,
-		uint32_t operation, const unsigned char *key, size_t ksize,
-		const unsigned char *val, size_t vsize,
-		const unsigned char *newval, size_t nvsize)
-{
-	struct queue_entry *e;
-	unsigned char *kcopy, *vcopy, *nvcopy;
-
-	e = queue_entry_create();
-	if (e == NULL) {
-		return NULL;
-	}
-
-	kcopy = NULL;
-	if (key != NULL) {
-		kcopy = malloc(ksize);
-		if (kcopy == NULL) {
-			queue_entry_free(e);
-			return NULL;
-		}
-		memcpy(kcopy, key, ksize);
-	}
-
-	vcopy = NULL;
-	if (val != NULL) {
-		vcopy = malloc(vsize);
-		if (vcopy == NULL) {
-			queue_entry_free(e);
-			if (kcopy != NULL)
-				free(kcopy);
-			return NULL;
-		}
-		memcpy(vcopy, val, vsize);
-	}
-
-	nvcopy = NULL;
-	if (newval != NULL) {
-		nvcopy = malloc(nvsize);
-		if (nvcopy == NULL) {
-			queue_entry_free(e);
-			if (kcopy != NULL)
-				free(kcopy);
-			if (vcopy != NULL)
-				free(vcopy);
-			return NULL;
-		}
-		memcpy(nvcopy, newval, nvsize);
-	}
-
-	e->operation = operation;
-	e->key = kcopy;
-	e->ksize = ksize;
-	e->val = vcopy;
-	e->vsize = vsize;
-	e->newval = nvcopy;
-	e->nvsize = nvsize;
-
-	/* Create a copy of req, including clisa */
-	e->req = malloc(sizeof(struct req_info));
-	if (e->req == NULL) {
-		queue_entry_free(e);
-		return NULL;
-	}
-	memcpy(e->req, req, sizeof(struct req_info));
-
-	e->req->clisa = malloc(sizeof(struct sockaddr_tipc));
-	if (e->req->clisa == NULL) {
-		queue_entry_free(e);
-		return NULL;
-	}
-	memcpy(e->req->clisa, req->clisa, sizeof(struct sockaddr_tipc));
-
-	/* clear out unused fields */
-	e->req->payload = NULL;
-	e->req->psize = 0;
-
-	return e;
-}
-
-/* Like make_queue_long_entry() but with few parameters because most actions
- * do not need newval. */
-static struct queue_entry *make_queue_entry(struct req_info *req,
-		uint32_t operation, const unsigned char *key, size_t ksize,
-		const unsigned char *val, size_t vsize)
-{
-	return make_queue_long_entry(req, operation, key, ksize, val, vsize,
-			NULL, 0);
-}
-
-
 /* The tipc_reply_* functions are used by the db code to send the network
  * replies. */
 
@@ -200,7 +101,7 @@ void tipc_reply_get(struct req_info *req, uint32_t reply,
 {
 	if (val == NULL) {
 		/* miss */
-		mini_reply(req, reply);
+		tipc_mini_reply(req, reply);
 	} else {
 		unsigned char *buf;
 		size_t bsize;
@@ -234,18 +135,18 @@ void tipc_reply_get(struct req_info *req, uint32_t reply,
 
 void tipc_reply_set(struct req_info *req, uint32_t reply)
 {
-	mini_reply(req, reply);
+	tipc_mini_reply(req, reply);
 }
 
 
 void tipc_reply_del(struct req_info *req, uint32_t reply)
 {
-	mini_reply(req, reply);
+	tipc_mini_reply(req, reply);
 }
 
 void tipc_reply_cas(struct req_info *req, uint32_t reply)
 {
-	mini_reply(req, reply);
+	tipc_mini_reply(req, reply);
 }
 
 
@@ -325,6 +226,7 @@ void tipc_recv(int fd, short event, void *arg)
 	req.type = REQTYPE_TIPC;
 	req.clisa = (struct sockaddr *) &clisa;
 	req.clilen = clilen;
+	req.mini_reply = tipc_mini_reply;
 	req.reply_err = tipc_reply_err;
 	req.reply_get = tipc_reply_get;
 	req.reply_set = tipc_reply_set;
@@ -332,331 +234,10 @@ void tipc_recv(int fd, short event, void *arg)
 	req.reply_cas = tipc_reply_cas;
 
 	/* parse the message */
-	parse_msg(&req, buf, rv);
+	parse_message(&req, buf, rv);
 
 exit:
 	return;
 }
 
 
-/* Main message parsing and dissecting */
-static void parse_msg(struct req_info *req, unsigned char *buf, size_t bsize)
-{
-	uint32_t hdr, id, cmd;
-	uint8_t ver;
-	size_t psize;
-	unsigned char *payload;
-
-	hdr = * (uint32_t *) buf;
-	hdr = htonl(hdr);
-
-	/* FIXME: little endian-only */
-	ver = (hdr & 0xF0000000) >> 28;
-	id = hdr & 0x0FFFFFFF;
-	req->id = id;
-
-	cmd = ntohl(* ((uint32_t *) buf + 1));
-
-	if (ver != PROTO_VER) {
-		stats.net_version_mismatch++;
-		rep_send_error(req, ERR_VER);
-		return;
-	}
-
-	/* We define payload as the stuff after buf. But be careful because
-	 * there might be none (if bsize == 1). Doing the pointer arithmetic
-	 * isn't problematic, but accessing the payload should be done only if
-	 * we know we have enough data. */
-	payload = buf + 8;
-	psize = bsize - 8;
-
-	/* Store the id encoded in network byte order, so that we don't have
-	 * to calculate it at send time. */
-	req->id = htonl(id);
-	req->cmd = cmd;
-	req->payload = payload;
-	req->psize = psize;
-
-	if (cmd == REQ_CACHE_GET)
-		parse_get(req, 0);
-	else if (cmd == REQ_CACHE_SET)
-		parse_set(req, 0, 0);
-	else if (cmd == REQ_CACHE_DEL)
-		parse_del(req, 0, 0);
-	else if (cmd == REQ_GET)
-		parse_get(req, 1);
-	else if (cmd == REQ_SET_SYNC)
-		parse_set(req, 1, 0);
-	else if (cmd == REQ_DEL_SYNC)
-		parse_del(req, 1, 0);
-	else if (cmd == REQ_SET_ASYNC)
-		parse_set(req, 1, 1);
-	else if (cmd == REQ_DEL_ASYNC)
-		parse_del(req, 1, 1);
-	else if (cmd == REQ_CACHE_CAS)
-		parse_cas(req, 0);
-	else if (cmd == REQ_CAS)
-		parse_cas(req, 1);
-	else {
-		stats.net_unk_req++;
-		rep_send_error(req, ERR_UNKREQ);
-		return;
-	}
-
-	return;
-}
-
-
-static void parse_get(struct req_info *req, int impact_db)
-{
-	int hit;
-	unsigned char *key;
-	uint32_t ksize;
-	unsigned char *val = NULL;
-	size_t vsize = 0;
-
-	if (settings.passive)
-		return;
-
-	ksize = * (uint32_t *) req->payload;
-	ksize = ntohl(ksize);
-	if (req->psize < ksize) {
-		stats.net_broken_req++;
-		rep_send_error(req, ERR_BROKEN);
-		return;
-	}
-
-	key = req->payload + sizeof(uint32_t);
-
-	hit = cache_get(cache_table, key, ksize, &val, &vsize);
-
-	if (!hit && !impact_db) {
-		mini_reply(req, REP_CACHE_MISS);
-		return;
-	} else if (!hit && impact_db) {
-		struct queue_entry *e;
-		e = make_queue_entry(req, REQ_GET, key, ksize, NULL, 0);
-		if (e == NULL) {
-			rep_send_error(req, ERR_MEM);
-			return;
-		}
-		queue_lock(op_queue);
-		queue_put(op_queue, e);
-		queue_unlock(op_queue);
-		queue_signal(op_queue);
-		return;
-	} else {
-		tipc_reply_get(req, REP_CACHE_HIT, val, vsize);
-		return;
-	}
-}
-
-
-static void parse_set(struct req_info *req, int impact_db, int async)
-{
-	int rv;
-	unsigned char *key, *val;
-	uint32_t ksize, vsize;
-	const int max = 65536;
-
-	/* Request format:
-	 * 4		ksize
-	 * 4		vsize
-	 * ksize	key
-	 * vsize	val
-	 */
-	ksize = * (uint32_t *) req->payload;
-	ksize = ntohl(ksize);
-	vsize = * ( ((uint32_t *) req->payload) + 1),
-	vsize = ntohl(vsize);
-
-	/* Sanity check on sizes:
-	 * - ksize and vsize must both be < req->psize
-	 * - ksize and vsize must both be < 2^16 = 64k
-	 * - ksize + vsize < 2^16 = 64k
-	 */
-	if ( (req->psize < ksize) || (req->psize < vsize) ||
-			(ksize > max) || (vsize > max) ||
-			( (ksize + vsize) > max) ) {
-		stats.net_broken_req++;
-		rep_send_error(req, ERR_BROKEN);
-		return;
-	}
-
-	key = req->payload + sizeof(uint32_t) * 2;
-	val = key + ksize;
-
-	rv = cache_set(cache_table, key, ksize, val, vsize);
-	if (!rv) {
-		rep_send_error(req, ERR_MEM);
-		return;
-	}
-
-	if (impact_db) {
-		struct queue_entry *e;
-		uint32_t request;
-
-		request = REQ_SET_SYNC;
-		if (async)
-			request = REQ_SET_ASYNC;
-
-		e = make_queue_entry(req, request, key, ksize, val, vsize);
-		if (e == NULL) {
-			rep_send_error(req, ERR_MEM);
-			return;
-		}
-		queue_lock(op_queue);
-		queue_put(op_queue, e);
-		queue_unlock(op_queue);
-
-		if (async) {
-			mini_reply(req, REP_OK);
-		} else {
-			/* Signal the DB thread it has work only if it's a
-			 * synchronous operation, asynchronous don't mind
-			 * waiting. It does have a measurable impact on
-			 * performance (2083847usec vs 2804973usec for sets on
-			 * "test2d 100000 10 10"). */
-			queue_signal(op_queue);
-		}
-		return;
-	} else {
-		mini_reply(req, REP_OK);
-	}
-
-	return;
-}
-
-
-static void parse_del(struct req_info *req, int impact_db, int async)
-{
-	int hit;
-	unsigned char *key;
-	uint32_t ksize;
-
-	ksize = * (uint32_t *) req->payload;
-	ksize = ntohl(ksize);
-	if (req->psize < ksize) {
-		stats.net_broken_req++;
-		rep_send_error(req, ERR_BROKEN);
-		return;
-	}
-
-	key = req->payload + sizeof(uint32_t);
-
-	hit = cache_del(cache_table, key, ksize);
-
-	if (!impact_db && hit) {
-		mini_reply(req, REP_OK);
-	} else if (!impact_db && !hit) {
-		mini_reply(req, REP_NOTIN);
-	} else if (impact_db) {
-		struct queue_entry *e;
-		uint32_t request;
-
-		request = REQ_DEL_SYNC;
-		if (async)
-			request = REQ_DEL_ASYNC;
-
-		e = make_queue_entry(req, request, key, ksize, NULL, 0);
-		if (e == NULL) {
-			rep_send_error(req, ERR_MEM);
-			return;
-		}
-		queue_lock(op_queue);
-		queue_put(op_queue, e);
-		queue_unlock(op_queue);
-
-		if (async) {
-			mini_reply(req, REP_OK);
-		} else {
-			/* See comment on parse_set(). */
-			queue_signal(op_queue);
-		}
-
-		return;
-	}
-
-	return;
-}
-
-static void parse_cas(struct req_info *req, int impact_db)
-{
-	int rv;
-	unsigned char *key, *oldval, *newval;
-	uint32_t ksize, ovsize, nvsize;
-	const int max = 65536;
-
-	/* Request format:
-	 * 4		ksize
-	 * 4		ovsize
-	 * 4		nvsize
-	 * ksize	key
-	 * ovsize	oldval
-	 * nvsize	newval
-	 */
-	ksize = * (uint32_t *) req->payload;
-	ksize = ntohl(ksize);
-	ovsize = * ( ((uint32_t *) req->payload) + 1);
-	ovsize = ntohl(ovsize);
-	nvsize = * ( ((uint32_t *) req->payload) + 2);
-	nvsize = ntohl(nvsize);
-
-	/* Sanity check on sizes:
-	 * - ksize, ovsize and nvsize must all be < req->psize
-	 * - ksize, ovsize and nvsize must all be < 2^16 = 64k
-	 * - ksize + ovsize + mvsize < 2^16 = 64k
-	 */
-	if ( (req->psize < ksize) || (req->psize < ovsize) ||
-				(req->psize < nvsize) ||
-			(ksize > max) || (ovsize > max) ||
-				(nvsize > max) ||
-			( (ksize + ovsize + nvsize) > max) ) {
-		stats.net_broken_req++;
-		rep_send_error(req, ERR_BROKEN);
-		return;
-	}
-
-	key = req->payload + sizeof(uint32_t) * 3;
-	oldval = key + ksize;
-	newval = oldval + ovsize;
-
-	rv = cache_cas(cache_table, key, ksize, oldval, ovsize,
-			newval, nvsize);
-	if (rv == 0) {
-		/* If the cache doesn't match, there is no need to bother the
-		 * DB even if we were asked to impact. */
-		mini_reply(req, REP_NOMATCH);
-		return;
-	}
-
-	if (!impact_db) {
-		if (rv == -1) {
-			mini_reply(req, REP_NOTIN);
-			return;
-		} else {
-			mini_reply(req, REP_OK);
-			return;
-		}
-	} else {
-		/* impact_db = 1 and the key is either not in the cache, or
-		 * cache_cas() was successful. We now need to queue the CAS in
-		 * the database. */
-		struct queue_entry *e;
-
-		e = make_queue_long_entry(req, REQ_CAS, key, ksize,
-				oldval, ovsize, newval, nvsize);
-		if (e == NULL) {
-			rep_send_error(req, ERR_MEM);
-			return;
-		}
-
-		queue_lock(op_queue);
-		queue_put(op_queue, e);
-		queue_unlock(op_queue);
-		queue_signal(op_queue);
-	}
-	return;
-}
-
-