git » nmdb » commit affed3d

Improve libnmdb's TCP support.

author Alberto Bertogli
2007-06-04 02:06:20 UTC
committer Alberto Bertogli
2007-06-04 02:06:20 UTC
parent a418111dc0019dc4d3f82ff1879de93ee9ff1597

Improve libnmdb's TCP support.

Until now, we did two send() and two recv(), one for the message length,
and other for the message itself. This slowed TCP down a lot.

This patch makes recv() handle the common case where we get the whole
thing in one go, and adds generic send() infraestructure to handle
transport protocols which need to get room for a header before sending
the packet.

This improved TCP performance in a significant way, making it 1/3 faster
than before, although it's still slower than TIPC on localhost.

Tested with all testcases and valgrind.

Signed-off-by: Alberto Bertogli <albertito@gmail.com>

libnmdb/internal.h +5 -0
libnmdb/libnmdb.c +68 -40
libnmdb/tcp.c +47 -21

diff --git a/libnmdb/internal.h b/libnmdb/internal.h
index 8df2b47..fac8d8e 100644
--- a/libnmdb/internal.h
+++ b/libnmdb/internal.h
@@ -11,6 +11,11 @@
  *  * are implemented. */
 #define ID_CODE 1
 
+/* For a given buffer, how much into it should the generic library code write
+ * the message contents. */
+#define TIPC_MSG_OFFSET 0
+#define TCP_MSG_OFFSET 4
+
 /* Functions used internally but shared among the different files. */
 int compare_servers(const void *s1, const void *s2);
 ssize_t srecv(int fd, unsigned char *buf, size_t count, int flags);
diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c
index 11fa73f..e3fd173 100644
--- a/libnmdb/libnmdb.c
+++ b/libnmdb/libnmdb.c
@@ -174,6 +174,19 @@ static uint32_t get_rep(struct nmdb_srv *srv,
 		return 0;
 }
 
+static int srv_get_msg_offset(struct nmdb_srv *srv)
+{
+	if (srv == NULL)
+		return 0;
+
+	if (srv->type == TIPC_CONN)
+		return TIPC_MSG_OFFSET;
+	else if (srv->type == TCP_CONN)
+		return TCP_MSG_OFFSET;
+	else
+		return 0;
+}
+
 
 /* Hash function used internally by select_srv(). See RFC 1071. */
 static uint32_t checksum(const unsigned char *buf, size_t bsize)
@@ -212,6 +225,7 @@ static ssize_t do_get(nmdb_t *db,
 		const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t vsize, int impact_db)
 {
+	int moff;
 	ssize_t rv, t;
 	unsigned char *buf, *p;
 	size_t bsize, reqsize, psize = 0;
@@ -224,29 +238,32 @@ static ssize_t do_get(nmdb_t *db,
 		request = REQ_CACHE_GET;
 	}
 
+	srv = select_srv(db, key, ksize);
+	moff = srv_get_msg_offset(srv);
+
 	/* Use the same buffer for the request and the reply.
 	 * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize,
 	 * 		ksize bytes key.
 	 * Reply: 4 bytes id, 4 bytes reply code, 4 bytes vsize,
 	 * 		vsize bytes key.
 	 *
-	 * We don't know vsize beforehand, but we do know TIPC's max packet is
-	 * 66000. We malloc 70k just in case.
+	 * We don't know vsize beforehand, but we do know our max packet size
+	 * is 64kb. We malloc 68kb just in case.
 	 */
-	bsize = 70 * 1024;
+	bsize = 68 * 1024;
 	buf = malloc(bsize);
 	if (buf == NULL)
 		return -1;
 
-	* (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE );
-	* ((uint32_t *) buf + 1) = htonl(request);
-	* ((uint32_t *) buf + 2) = htonl(ksize);
-	p = buf + 3 * 4;
-	memcpy(p, key, ksize);
+	p = buf + moff;
+
+	* (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE );
+	* ((uint32_t *) p + 1) = htonl(request);
+	* ((uint32_t *) p + 2) = htonl(ksize);
+	memcpy(p + 3 * 4, key, ksize);
 	reqsize = 3 * 4 + ksize;
 
-	srv = select_srv(db, key, ksize);
-	t = srv_send(srv, buf, reqsize);
+	t = srv_send(srv, buf, moff + reqsize);
 	if (t <= 0) {
 		rv = -1;
 		goto exit;
@@ -298,6 +315,7 @@ static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize,
 		const unsigned char *val, size_t vsize,
 		int impact_db, int async)
 {
+	int moff;
 	ssize_t rv, t;
 	unsigned char *buf, *p;
 	size_t bsize;
@@ -313,27 +331,28 @@ static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize,
 		request = REQ_CACHE_SET;
 	}
 
+	srv = select_srv(db, key, ksize);
+	moff = srv_get_msg_offset(srv);
 
 	/* Use the same buffer for the request and the reply.
 	 * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, 4
 	 *		bytes vsize, ksize bytes key, vsize bytes val.
 	 * Reply: 4 bytes id, 4 bytes reply code.
 	 */
-	bsize = 4 + 4 + 4 + 4 + ksize + vsize;
+	bsize = moff + 4 + 4 + 4 + 4 + ksize + vsize;
 	buf = malloc(bsize);
 	if (buf == NULL)
 		return -1;
 
-	* (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE );
-	* ((uint32_t *) buf + 1) = htonl(request);
-	* ((uint32_t *) buf + 2) = htonl(ksize);
-	* ((uint32_t *) buf + 3) = htonl(vsize);
-	p = buf + 4 * 4;
-	memcpy(p, key, ksize);
-	p += ksize;
-	memcpy(p, val, vsize);
+	p = buf + moff;
+
+	* (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE );
+	* ((uint32_t *) p + 1) = htonl(request);
+	* ((uint32_t *) p + 2) = htonl(ksize);
+	* ((uint32_t *) p + 3) = htonl(vsize);
+	memcpy(p + 4 * 4, key, ksize);
+	memcpy(p + 4 * 4 + ksize, val, vsize);
 
-	srv = select_srv(db, key, ksize);
 	t = srv_send(srv, buf, bsize);
 	if (t <= 0) {
 		rv = -1;
@@ -379,8 +398,9 @@ int nmdb_cache_set(nmdb_t *db, const unsigned char *key, size_t ksize,
 static int do_del(nmdb_t *db, const unsigned char *key, size_t ksize,
 		int impact_db, int async)
 {
+	int moff;
 	ssize_t rv, t;
-	unsigned char *buf;
+	unsigned char *buf, *p;
 	size_t bsize;
 	uint32_t request, reply;
 	struct nmdb_srv *srv;
@@ -394,23 +414,26 @@ static int do_del(nmdb_t *db, const unsigned char *key, size_t ksize,
 		request = REQ_CACHE_DEL;
 	}
 
+	srv = select_srv(db, key, ksize);
+	moff = srv_get_msg_offset(srv);
 
 	/* Use the same buffer for the request and the reply.
 	 * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize,
 	 * 		ksize bytes key.
 	 * Reply: 4 bytes id, 4 bytes reply code.
 	 */
-	bsize = 8 + 4 + ksize;
+	bsize = moff + 8 + 4 + ksize;
 	buf = malloc(bsize);
 	if (buf == NULL)
 		return -1;
 
-	* (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE );
-	* ((uint32_t *) buf + 1) = htonl(request);
-	* ((uint32_t *) buf + 2) = htonl(ksize);
-	memcpy(buf + 3 * 4, key, ksize);
+	p = buf + moff;
+
+	* (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE );
+	* ((uint32_t *) p + 1) = htonl(request);
+	* ((uint32_t *) p + 2) = htonl(ksize);
+	memcpy(p + 3 * 4, key, ksize);
 
-	srv = select_srv(db, key, ksize);
 	t = srv_send(srv, buf, bsize);
 	if (t <= 0) {
 		rv = -1;
@@ -457,8 +480,9 @@ static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize,
 		const unsigned char *newval, size_t nvsize,
 		int impact_db)
 {
+	int moff;
 	ssize_t rv, t;
-	unsigned char *buf, *p;
+	unsigned char *buf, *p, *q;
 	size_t bsize;
 	uint32_t request, reply;
 	struct nmdb_srv *srv;
@@ -467,6 +491,8 @@ static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize,
 	if (impact_db)
 		request = REQ_CAS;
 
+	srv = select_srv(db, key, ksize);
+	moff = srv_get_msg_offset(srv);
 
 	/* Use the same buffer for the request and the reply.
 	 * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, 4
@@ -474,22 +500,24 @@ static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize,
 	 *		ovsize bytes oldval, nvsize bytes newval.
 	 * Reply: 4 bytes id, 4 bytes reply code.
 	 */
-	bsize = 4 + 4 + 4 + 4 + 4 + ksize + ovsize + nvsize;
+	bsize = moff + 4 + 4 + 4 + 4 + 4 + ksize + ovsize + nvsize;
 	buf = malloc(bsize);
 	if (buf == NULL)
 		return -1;
 
-	* (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE );
-	* ((uint32_t *) buf + 1) = htonl(request);
-	* ((uint32_t *) buf + 2) = htonl(ksize);
-	* ((uint32_t *) buf + 3) = htonl(ovsize);
-	* ((uint32_t *) buf + 4) = htonl(nvsize);
-	p = buf + 5 * 4;
-	memcpy(p, key, ksize);
-	p += ksize;
-	memcpy(p, oldval, ovsize);
-	p += ovsize;
-	memcpy(p, newval, nvsize);
+	p = buf + moff;
+
+	* (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE );
+	* ((uint32_t *) p + 1) = htonl(request);
+	* ((uint32_t *) p + 2) = htonl(ksize);
+	* ((uint32_t *) p + 3) = htonl(ovsize);
+	* ((uint32_t *) p + 4) = htonl(nvsize);
+	q = p + 5 * 4;
+	memcpy(q, key, ksize);
+	q += ksize;
+	memcpy(q, oldval, ovsize);
+	q += ovsize;
+	memcpy(q, newval, nvsize);
 
 	srv = select_srv(db, key, ksize);
 	t = srv_send(srv, buf, bsize);
diff --git a/libnmdb/tcp.c b/libnmdb/tcp.c
index 1f03f1f..6e701ed 100644
--- a/libnmdb/tcp.c
+++ b/libnmdb/tcp.c
@@ -103,15 +103,13 @@ int nmdb_add_tcp_server(nmdb_t *db, const char *addr, int port)
 }
 
 int tcp_srv_send(struct nmdb_srv *srv,
-		const unsigned char *buf, size_t bsize)
+		unsigned char *buf, size_t bsize)
 {
 	ssize_t rv;
 	uint32_t len;
 
-	len = htonl(bsize + 4);
-	rv = ssend(srv->fd, (unsigned char *) &len, 4, 0);
-	if (rv != 4)
-		return 0;
+	len = htonl(bsize);
+	memcpy(buf, (const void *) &len, 4);
 
 	rv = ssend(srv->fd, buf, bsize, 0);
 	if (rv != bsize)
@@ -119,30 +117,58 @@ int tcp_srv_send(struct nmdb_srv *srv,
 	return 1;
 }
 
+static ssize_t recv_msg(int fd, unsigned char *buf, size_t bsize)
+{
+	ssize_t rv, t;
+	uint32_t msgsize;
+
+	rv = recv(fd, buf, bsize, 0);
+	if (rv <= 0)
+		return rv;
+
+	if (rv < 4) {
+		t = srecv(fd, buf + rv, 4 - rv, 0);
+		if (t <= 0) {
+			return t;
+		}
+
+		rv = rv + t;
+	}
+
+	msgsize = * ((uint32_t *) buf);
+	msgsize = ntohl(msgsize);
+
+	if (msgsize > bsize)
+		return -1;
+
+	if (rv < msgsize) {
+		t = srecv(fd, buf + rv, msgsize - rv, 0);
+		if (t <= 0) {
+			return t;
+		}
+
+		rv = rv + t;
+	}
+
+	return rv;
+}
+
+
 /* Used internally to get and parse replies from the server. */
 uint32_t tcp_get_rep(struct nmdb_srv *srv,
 		unsigned char *buf, size_t bsize,
 		unsigned char **payload, size_t *psize)
 {
 	ssize_t rv;
-	uint32_t id, reply, msgsize;
+	uint32_t id, reply;
 
-	rv = srecv(srv->fd, (unsigned char *) &msgsize, 4, 0);
-	if (rv != 4)
+	rv = recv_msg(srv->fd, buf, bsize);
+	if (rv <= 0)
 		return -1;
 
-	msgsize = ntohl(msgsize);
-	if (bsize < msgsize)
-		return -1;
-
-	rv = srecv(srv->fd, buf, msgsize - 4, 0);
-	if (rv != msgsize - 4) {
-		return -1;
-	}
-
-	id = * (uint32_t *) buf;
+	id = * ((uint32_t *) buf + 1);
 	id = ntohl(id);
-	reply = * ((uint32_t *) buf + 1);
+	reply = * ((uint32_t *) buf + 2);
 	reply = ntohl(reply);
 
 	if (id != ID_CODE) {
@@ -150,8 +176,8 @@ uint32_t tcp_get_rep(struct nmdb_srv *srv,
 	}
 
 	if (payload != NULL) {
-		*payload = buf + 4 + 4;
-		*psize = rv - 4 - 4;
+		*payload = buf + 4 + 4 + 4;
+		*psize = rv - 4 - 4 - 4;
 	}
 	return reply;
 }