git » nmdb » commit 1a050bc

Implement TCP support in libnmdb.

author Alberto Bertogli
2007-06-01 04:43:23 UTC
committer Alberto Bertogli
2007-06-01 04:43:23 UTC
parent 042a0dc669a4cd9a0e911cb1b278d858923c916e

Implement TCP support in libnmdb.

This breaks the API, so the bindings and test programs will need updates.
They will come in the next patches.

Things done:
 - Fully functional for single-server operation.
 - Makes valgrind happy.
 - Not break TIPC.

Things to do:
 - Improve performance.
 - Handle multiple servers.
 - Update documentation.

Signed-off-by: Alberto Bertogli <albertito@gmail.com>

libnmdb/libnmdb.c +233 -70
libnmdb/nmdb.h +18 -6

diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c
index 6e482e4..51eaed0 100644
--- a/libnmdb/libnmdb.c
+++ b/libnmdb/libnmdb.c
@@ -7,6 +7,7 @@
 #include <arpa/inet.h>		/* htonls() and friends */
 #include <string.h>		/* memcpy() */
 #include <unistd.h>		/* close() */
+#include <netinet/tcp.h>	/* TCP stuff */
 
 #include <stdio.h>
 
@@ -17,50 +18,72 @@
  * are implemented. */
 #define ID_CODE 1
 
+/* Different connection types. Used internally to differentiate between TIPC
+ * and TCP connections in struct nmdb_srv. */
+#define TIPC_CONN 1
+#define TCP_CONN 2
 
-/* Create a nmdb_t and set the first server to port. If port is < 0, the
- * standard port is used. */
-nmdb_t *nmdb_init(int port)
+
+/* Like recv() but either fails, or returns a complete read; if we return less
+ * than count is because EOF was reached */
+ssize_t srecv(int fd, unsigned char *buf, size_t count, int flags)
 {
-	int fd;
-	nmdb_t *db;
-	struct nmdb_srv *server;
+	ssize_t rv, c;
 
-	db = malloc(sizeof(nmdb_t));
-	if (db == NULL) {
-		return NULL;
+	c = 0;
+
+	while (c < count) {
+		rv = recv(fd, buf + c, count - c, flags);
+
+		if (rv == count)
+			return count;
+		else if (rv < 0)
+			return rv;
+		else if (rv == 0)
+			return c;
+
+		c += rv;
 	}
+	return count;
+}
 
-	server = malloc(sizeof(struct nmdb_srv));
-	if (server == NULL) {
-		free(db);
-		return NULL;
+/* Like srecv() but for send() */
+ssize_t ssend(int fd, const unsigned char *buf, size_t count, int flags)
+{
+	ssize_t rv, c;
+
+	c = 0;
+
+	while (c < count) {
+		rv = send(fd, buf + c, count - c, flags);
+
+		if (rv == count)
+			return count;
+		else if (rv < 0)
+			return rv;
+		else if (rv == 0)
+			return c;
+
+		c += rv;
 	}
+	return count;
+}
 
-	db->servers = server;
-	db->nservers = 1;
 
-	/* the fd is shared among the different servers, because we use
-	 * sendto() directly instead of connecting sockets */
-	fd = socket(AF_TIPC, SOCK_RDM, 0);
-	if (fd < 0) {
-		free(db->servers);
-		free(db);
+
+/* Create a nmdb_t and set the first server to port. If port is < 0, the
+ * standard port is used. */
+nmdb_t *nmdb_init()
+{
+	nmdb_t *db;
+
+	db = malloc(sizeof(nmdb_t));
+	if (db == NULL) {
 		return NULL;
 	}
-	db->fd = fd;
 
-	if (port < 0)
-		port = SERVER_INST;
-
-	server->port = port;
-	server->srvsa.family = AF_TIPC;
-	server->srvsa.addrtype = TIPC_ADDR_NAMESEQ;
-	server->srvsa.addr.nameseq.type = SERVER_TYPE;
-	server->srvsa.addr.nameseq.lower = port;
-	server->srvsa.addr.nameseq.upper = port;
-	server->srvsa.scope = TIPC_CLUSTER_SCOPE;
-	server->srvlen = (socklen_t) sizeof(server->srvsa);
+	db->servers = NULL;
+	db->nservers = 0;
 
 	return db;
 }
@@ -72,18 +95,19 @@ static int compare_servers(const void *s1, const void *s2)
 	struct nmdb_srv *srv1 = (struct nmdb_srv *) s1;
 	struct nmdb_srv *srv2 = (struct nmdb_srv *) s2;
 
-	if (srv1->port < srv2->port)
+	if (srv1->id < srv2->id)
 		return -1;
-	else if (srv1->port == srv2->port)
+	else if (srv1->id == srv2->id)
 		return 0;
 	else
 		return 1;
 }
 
-/* Add a server to the db connection. Requests will select which server to use
- * by hashing the key. */
-int nmdb_add_server(nmdb_t *db, int port)
+/* Add a TIPC server to the db connection. Requests will select which server
+ * to use by hashing the key. */
+int nmdb_add_tipc_server(nmdb_t *db, int port)
 {
+	int fd;
 	struct nmdb_srv *newsrv, *newarray;
 
 	newarray = realloc(db->servers,
@@ -96,17 +120,81 @@ int nmdb_add_server(nmdb_t *db, int port)
 
 	newsrv = &(db->servers[db->nservers - 1]);
 
+	fd = socket(AF_TIPC, SOCK_RDM, 0);
+	if (fd < 0) {
+		return 0;
+	}
+	newsrv->fd = fd;
+
 	if (port < 0)
 		port = SERVER_INST;
 
-	newsrv->port = port;
-	newsrv->srvsa.family = AF_TIPC;
-	newsrv->srvsa.addrtype = TIPC_ADDR_NAMESEQ;
-	newsrv->srvsa.addr.nameseq.type = SERVER_TYPE;
-	newsrv->srvsa.addr.nameseq.lower = port;
-	newsrv->srvsa.addr.nameseq.upper = port;
-	newsrv->srvsa.scope = TIPC_CLUSTER_SCOPE;
-	newsrv->srvlen = (socklen_t) sizeof(newsrv->srvsa);
+	newsrv->info.tipc.port = port;
+	newsrv->info.tipc.srvsa.family = AF_TIPC;
+	newsrv->info.tipc.srvsa.addrtype = TIPC_ADDR_NAMESEQ;
+	newsrv->info.tipc.srvsa.addr.nameseq.type = SERVER_TYPE;
+	newsrv->info.tipc.srvsa.addr.nameseq.lower = port;
+	newsrv->info.tipc.srvsa.addr.nameseq.upper = port;
+	newsrv->info.tipc.srvsa.scope = TIPC_CLUSTER_SCOPE;
+	newsrv->info.tipc.srvlen = (socklen_t) sizeof(newsrv->info.tipc.srvsa);
+
+	/* we use the port as an id for sorting */
+	newsrv->id = port;
+
+	newsrv->type = TIPC_CONN;
+
+	/* keep the list sorted by port, so we can do a reliable selection */
+	qsort(db->servers, db->nservers, sizeof(struct nmdb_srv),
+			compare_servers);
+
+	return 1;
+}
+
+
+/* Same as nmdb_add_tipc_server() but for TCP connections. */
+int nmdb_add_tcp_server(nmdb_t *db, const char *addr, int port)
+{
+	int rv, fd;
+	struct nmdb_srv *newsrv, *newarray;
+
+	newarray = realloc(db->servers,
+			sizeof(struct nmdb_srv) * (db->nservers + 1));
+	if (newarray == NULL) {
+		return 0;
+	}
+	db->servers = newarray;
+	db->nservers++;
+
+	newsrv = &(db->servers[db->nservers - 1]);
+
+	fd = socket(AF_INET, SOCK_STREAM, 0);
+	if (fd < 0)
+		return 0;
+
+	newsrv->fd = fd;
+	newsrv->info.tcp.srvsa.sin_family = AF_INET;
+	newsrv->info.tcp.srvsa.sin_port = htons(port);
+	rv = inet_pton(AF_INET, addr, &(newsrv->info.tcp.srvsa.sin_addr));
+	if (rv <= 0)
+		return 0;
+
+	rv = connect(fd, (struct sockaddr *) &(newsrv->info.tcp.srvsa),
+			sizeof(newsrv->info.tcp.srvsa));
+	if (rv < 0)
+		return 0;
+
+	/* Disable Nagle algorithm because we often send small packets. Huge
+	 * gain in performance. */
+	rv = 1;
+	if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &rv, sizeof(rv)) < 0 ) {
+		close(fd);
+		return -1;
+	}
+
+	/* FIXME: find a decent ID to use */
+	newsrv->id = port;
+
+	newsrv->type = TCP_CONN;
 
 	/* keep the list sorted by port, so we can do a reliable selection */
 	qsort(db->servers, db->nservers, sizeof(struct nmdb_srv),
@@ -119,45 +207,70 @@ int nmdb_add_server(nmdb_t *db, int port)
 /* Frees a nmdb_t structure created with nmdb_init(). */
 int nmdb_free(nmdb_t *db)
 {
-	close(db->fd);
-	if (db->servers != NULL)
+	if (db->servers != NULL) {
+		int i;
+		for (i = 0; i < db->nservers; i++)
+			close(db->servers[i].fd);
 		free(db->servers);
+	}
 	free(db);
 	return 1;
 }
 
 
-/* Used internally to send a buffer to the given server. */
-static int srv_send(nmdb_t *db, struct nmdb_srv *srv,
+static int tipc_srv_send(struct nmdb_srv *srv,
 		const unsigned char *buf, size_t bsize)
 {
 	ssize_t rv;
-	rv = sendto(db->fd, buf, bsize, 0, (struct sockaddr *) &(srv->srvsa),
-			srv->srvlen);
+	rv = sendto(srv->fd, buf, bsize, 0,
+			(struct sockaddr *) &(srv->info.tipc.srvsa),
+			srv->info.tipc.srvlen);
 	if (rv <= 0)
 		return 0;
 	return 1;
 }
 
-/* Used internally to receive a buffer. */
-static ssize_t srv_recv(nmdb_t *db, struct nmdb_srv *srv,
-		unsigned char *buf, size_t bsize)
+static int tcp_srv_send(struct nmdb_srv *srv,
+		const unsigned char *buf, size_t bsize)
 {
 	ssize_t rv;
-	rv = recv(db->fd, buf, bsize, 0);
-	return rv;
+	uint32_t len;
+
+	len = htonl(bsize + 4);
+	rv = ssend(srv->fd, (unsigned char *) &len, 4, 0);
+	if (rv != 4)
+		return 0;
+
+	rv = ssend(srv->fd, buf, bsize, 0);
+	if (rv != bsize)
+		return 0;
+	return 1;
+}
+
+/* Used internally to send a buffer to the given server. Calls the appropriate
+ * sender according to the server protocol. */
+static int srv_send(struct nmdb_srv *srv,
+		const unsigned char *buf, size_t bsize)
+{
+	if (srv->type == TIPC_CONN)
+		return tipc_srv_send(srv, buf, bsize);
+	else if (srv->type == TCP_CONN)
+		return tcp_srv_send(srv, buf, bsize);
+	else
+		return 0;
 }
 
+
 /* Used internally to get and parse replies from the server. */
-static uint32_t get_rep(nmdb_t *db, struct nmdb_srv *srv,
+static uint32_t tipc_get_rep(struct nmdb_srv *srv,
 		unsigned char *buf, size_t bsize,
 		unsigned char **payload, size_t *psize)
 {
-	ssize_t t;
+	ssize_t rv;
 	uint32_t id, reply;
 
-	t = srv_recv(db, srv, buf, bsize);
-	if (t < 4 + 4) {
+	rv = recv(srv->fd, buf, bsize, 0);
+	if (rv < 4 + 4) {
 		return -1;
 	}
 
@@ -172,11 +285,61 @@ static uint32_t get_rep(nmdb_t *db, struct nmdb_srv *srv,
 
 	if (payload != NULL) {
 		*payload = buf + 4 + 4;
-		*psize = t - 4 - 4;
+		*psize = rv - 4 - 4;
 	}
 	return reply;
 }
 
+/* Used internally to get and parse replies from the server. */
+static uint32_t tcp_get_rep(struct nmdb_srv *srv,
+		unsigned char *buf, size_t bsize,
+		unsigned char **payload, size_t *psize)
+{
+	ssize_t rv;
+	uint32_t id, reply, msgsize;
+
+	rv = srecv(srv->fd, (unsigned char *) &msgsize, 4, 0);
+	if (rv != 4)
+		return -1;
+
+	msgsize = ntohl(msgsize);
+	if (bsize < msgsize)
+		return -1;
+
+	rv = srecv(srv->fd, buf, msgsize - 4, 0);
+	if (rv != msgsize - 4) {
+		return -1;
+	}
+
+	id = * (uint32_t *) buf;
+	id = ntohl(id);
+	reply = * ((uint32_t *) buf + 1);
+	reply = ntohl(reply);
+
+	if (id != ID_CODE) {
+		return -1;
+	}
+
+	if (payload != NULL) {
+		*payload = buf + 4 + 4;
+		*psize = rv - 4 - 4;
+	}
+	return reply;
+}
+
+static uint32_t get_rep(struct nmdb_srv *srv,
+		unsigned char *buf, size_t bsize,
+		unsigned char **payload, size_t *psize)
+{
+	if (srv->type == TIPC_CONN)
+		return tipc_get_rep(srv, buf, bsize, payload, psize);
+	else if (srv->type == TCP_CONN)
+		return tcp_get_rep(srv, buf, bsize, payload, psize);
+	else
+		return 0;
+}
+
+
 /* Hash function used internally by select_srv(). See RFC 1071. */
 static uint32_t checksum(const unsigned char *buf, size_t bsize)
 {
@@ -245,13 +408,13 @@ static ssize_t do_get(nmdb_t *db,
 	reqsize = 3 * 4 + ksize;
 
 	srv = select_srv(db, key, ksize);
-	t = srv_send(db, srv, buf, reqsize);
+	t = srv_send(srv, buf, reqsize);
 	if (t <= 0) {
 		rv = -1;
 		goto exit;
 	}
 
-	reply = get_rep(db, srv, buf, bsize, &p, &psize);
+	reply = get_rep(srv, buf, bsize, &p, &psize);
 
 	if (reply == REP_CACHE_MISS || reply == REP_NOTIN) {
 		rv = 0;
@@ -333,13 +496,13 @@ static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize,
 	memcpy(p, val, vsize);
 
 	srv = select_srv(db, key, ksize);
-	t = srv_send(db, srv, buf, bsize);
+	t = srv_send(srv, buf, bsize);
 	if (t <= 0) {
 		rv = -1;
 		goto exit;
 	}
 
-	reply = get_rep(db, srv, buf, bsize, NULL, NULL);
+	reply = get_rep(srv, buf, bsize, NULL, NULL);
 
 	if (reply == REP_OK) {
 		rv = 1;
@@ -410,13 +573,13 @@ static int do_del(nmdb_t *db, const unsigned char *key, size_t ksize,
 	memcpy(buf + 3 * 4, key, ksize);
 
 	srv = select_srv(db, key, ksize);
-	t = srv_send(db, srv, buf, bsize);
+	t = srv_send(srv, buf, bsize);
 	if (t <= 0) {
 		rv = -1;
 		goto exit;
 	}
 
-	reply = get_rep(db, srv, buf, bsize, NULL, NULL);
+	reply = get_rep(srv, buf, bsize, NULL, NULL);
 
 	if (reply == REP_OK) {
 		rv = 1;
@@ -491,13 +654,13 @@ static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize,
 	memcpy(p, newval, nvsize);
 
 	srv = select_srv(db, key, ksize);
-	t = srv_send(db, srv, buf, bsize);
+	t = srv_send(srv, buf, bsize);
 	if (t <= 0) {
 		rv = -1;
 		goto exit;
 	}
 
-	reply = get_rep(db, srv, buf, bsize, NULL, NULL);
+	reply = get_rep(srv, buf, bsize, NULL, NULL);
 
 	if (reply == REP_OK) {
 		rv = 2;
diff --git a/libnmdb/nmdb.h b/libnmdb/nmdb.h
index 7fc0994..9906033 100644
--- a/libnmdb/nmdb.h
+++ b/libnmdb/nmdb.h
@@ -5,21 +5,33 @@
 #include <sys/types.h>		/* socket defines */
 #include <sys/socket.h>		/* socklen_t */
 #include <linux/tipc.h>		/* struct sockaddr_tipc */
+#include <netinet/in.h>		/* struct sockaddr_in */
 
 struct nmdb_srv {
-	int port;
-	struct sockaddr_tipc srvsa;
-	socklen_t srvlen;
+	int fd;
+	int type;
+	union {
+		struct {
+			unsigned int port;
+			struct sockaddr_tipc srvsa;
+			socklen_t srvlen;
+		} tipc;
+		struct {
+			struct sockaddr_in srvsa;
+			socklen_t srvlen;
+		} tcp;
+	} info;
+	unsigned long id;
 };
 
 typedef struct nmdb_t {
-	int fd;
 	unsigned int nservers;
 	struct nmdb_srv *servers;
 } nmdb_t;
 
-nmdb_t *nmdb_init(int port);
-int nmdb_add_server(nmdb_t *db, int port);
+nmdb_t *nmdb_init();
+int nmdb_add_tipc_server(nmdb_t *db, int port);
+int nmdb_add_tcp_server(nmdb_t *db, const char *addr, int port);
 int nmdb_free(nmdb_t *db);
 
 ssize_t nmdb_get(nmdb_t *db, const unsigned char *key, size_t ksize,