git » nmdb » commit 9b4a245

Implement multiple server support in the C library.

author Alberto Bertogli
2007-01-06 16:31:36 UTC
committer Alberto Bertogli
2007-01-06 16:31:36 UTC
parent e3d0bb3e680010c48df6a6b9659a9e71a32f3a66

Implement multiple server support in the C library.
This patch adds support for multiple servers in the C library by adding a
nmbd_add_server() function, and adding a way of selecting the server to talk
to based on the key of the request.

It breaks the ABI.

libnmdb/libnmdb.c +106 -27
libnmdb/nmdb.h +2 -1

diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c
index 3358b1a..36c3155 100644
--- a/libnmdb/libnmdb.c
+++ b/libnmdb/libnmdb.c
@@ -18,6 +18,8 @@
 #define ID_CODE 1
 
 
+/* Create a nmdb_t and set the first server to port. If port is < 0, the
+ * standard port is used. */
 nmdb_t *nmdb_init(int port)
 {
 	int fd;
@@ -36,11 +38,22 @@ nmdb_t *nmdb_init(int port)
 	}
 
 	db->servers = server;
-	db->nservers++;
+	db->nservers = 1;
+
+	/* the fd is shared among the different servers, because we use
+	 * sendto() directly instead of connecting sockets */
+	fd = socket(AF_TIPC, SOCK_RDM, 0);
+	if (fd < 0) {
+		free(db->servers);
+		free(db);
+		return NULL;
+	}
+	db->fd = fd;
 
 	if (port < 0)
 		port = SERVER_INST;
 
+	server->port = port;
 	server->srvsa.family = AF_TIPC;
 	server->srvsa.addrtype = TIPC_ADDR_NAMESEQ;
 	server->srvsa.addr.nameseq.type = SERVER_TYPE;
@@ -49,58 +62,101 @@ nmdb_t *nmdb_init(int port)
 	server->srvsa.scope = TIPC_CLUSTER_SCOPE;
 	server->srvlen = (socklen_t) sizeof(server->srvsa);
 
-	fd = socket(AF_TIPC, SOCK_RDM, 0);
-	if (fd < 0) {
-		free(db->servers);
-		free(db);
-		return NULL;
-	}
-	server->fd = fd;
-
 	return db;
 }
 
+/* Compare two servers, using their ports. It is used internally to keep the
+ * server array sorted with qsort(). */
+static int compare_servers(const void *s1, const void *s2)
+{
+	struct nmdb_srv *srv1 = (struct nmdb_srv *) s1;
+	struct nmdb_srv *srv2 = (struct nmdb_srv *) s2;
 
-int nmdb_free(nmdb_t *db)
+	if (srv1->port < srv2->port)
+		return -1;
+	else if (srv1->port == srv2->port)
+		return 0;
+	else
+		return 1;
+}
+
+/* Add a server to the db connection. Requests will select which server to use
+ * by hashing the key. */
+int nmdb_add_server(nmdb_t *db, int port)
 {
-	int i;
+	struct nmdb_srv *newsrv, *newarray;
 
-	for (i = 0; i < db->nservers; i++) {
-		close((db->servers[i]).fd);
+	newarray = realloc(db->servers,
+			sizeof(struct nmdb_srv) * (db->nservers + 1));
+	if (newarray == NULL) {
+		return 0;
 	}
-	free(db->servers);
+	db->servers = newarray;
+	db->nservers++;
+
+	newsrv = &(db->servers[db->nservers - 1]);
+
+	if (port < 0)
+		port = SERVER_INST;
+
+	newsrv->port = port;
+	newsrv->srvsa.family = AF_TIPC;
+	newsrv->srvsa.addrtype = TIPC_ADDR_NAMESEQ;
+	newsrv->srvsa.addr.nameseq.type = SERVER_TYPE;
+	newsrv->srvsa.addr.nameseq.lower = port;
+	newsrv->srvsa.addr.nameseq.upper = port;
+	newsrv->srvsa.scope = TIPC_CLUSTER_SCOPE;
+	newsrv->srvlen = (socklen_t) sizeof(newsrv->srvsa);
+
+	/* keep the list sorted by port, so we can do a reliable selection */
+	qsort(db->servers, db->nservers, sizeof(struct nmdb_srv),
+			compare_servers);
+
+	return 1;
+}
+
+
+/* Frees a nmdb_t structure created with nmdb_init(). */
+int nmdb_free(nmdb_t *db)
+{
+	close(db->fd);
+	if (db->servers != NULL)
+		free(db->servers);
 	free(db);
 	return 1;
 }
 
 
-static int srv_send(struct nmdb_srv *srv,
+/* Used internally to send a buffer to the given server. */
+static int srv_send(nmdb_t *db, struct nmdb_srv *srv,
 		const unsigned char *buf, size_t bsize)
 {
 	ssize_t rv;
-	rv = sendto(srv->fd, buf, bsize, 0, (struct sockaddr *) &(srv->srvsa),
+	rv = sendto(db->fd, buf, bsize, 0, (struct sockaddr *) &(srv->srvsa),
 			srv->srvlen);
 	if (rv <= 0)
 		return 0;
 	return 1;
 }
 
-static ssize_t srv_recv(struct nmdb_srv *srv,
+/* Used internally to receive a buffer. */
+static ssize_t srv_recv(nmdb_t *db, struct nmdb_srv *srv,
 		unsigned char *buf, size_t bsize)
 {
 	ssize_t rv;
-	rv = recv(srv->fd, buf, bsize, 0);
+	rv = recv(db->fd, buf, bsize, 0);
 	return rv;
 }
 
-static uint32_t get_rep(struct nmdb_srv *srv,
+/* Used internally to get and parse replies from the server. */
+static uint32_t get_rep(nmdb_t *db, struct nmdb_srv *srv,
 		unsigned char *buf, size_t bsize,
 		unsigned char **payload, size_t *psize)
 {
 	ssize_t t;
 	uint32_t id, reply;
 
-	t = srv_recv(srv, buf, bsize);
+	t = srv_recv(db, srv, buf, bsize);
 	if (t < 4 + 4) {
 		return -1;
 	}
@@ -121,10 +177,33 @@ static uint32_t get_rep(struct nmdb_srv *srv,
 	return reply;
 }
 
+/* Hash function used internally by select_srv(). See RFC 1071. */
+uint32_t checksum(const unsigned char *buf, size_t bsize)
+{
+	uint32_t sum = 0;
+
+	while (bsize > 1)  {
+		sum += * (uint16_t *) buf++;
+		bsize -= 2;
+	}
+
+	if (bsize > 0)
+		sum += * (uint8_t *) buf;
+
+	while (sum >> 16)
+		sum = (sum & 0xffff) + (sum >> 16);
+
+	return ~sum;
+}
+
+/* Used internally to select which server to use for the given key. */
 static struct nmdb_srv *select_srv(nmdb_t *db,
 		const unsigned char *key, size_t ksize)
 {
-	return &(db->servers[0]);
+	uint32_t n;
+
+	n = checksum(key, ksize) % db->nservers;
+	return &(db->servers[n]);
 }
 
 
@@ -166,13 +245,13 @@ static ssize_t do_get(nmdb_t *db,
 	reqsize = 3 * 4 + ksize;
 
 	srv = select_srv(db, key, ksize);
-	t = srv_send(srv, buf, reqsize);
+	t = srv_send(db, srv, buf, reqsize);
 	if (t <= 0) {
 		rv = -1;
 		goto exit;
 	}
 
-	reply = get_rep(srv, buf, bsize, &p, &psize);
+	reply = get_rep(db, srv, buf, bsize, &p, &psize);
 
 	if (reply == REP_CACHE_MISS || reply == REP_NOTIN) {
 		rv = 0;
@@ -254,13 +333,13 @@ static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize,
 	memcpy(p, val, vsize);
 
 	srv = select_srv(db, key, ksize);
-	t = srv_send(srv, buf, bsize);
+	t = srv_send(db, srv, buf, bsize);
 	if (t <= 0) {
 		rv = -1;
 		goto exit;
 	}
 
-	reply = get_rep(srv, buf, bsize, NULL, NULL);
+	reply = get_rep(db, srv, buf, bsize, NULL, NULL);
 
 	if (reply == REP_OK) {
 		rv = 1;
@@ -331,13 +410,13 @@ int do_del(nmdb_t *db, const unsigned char *key, size_t ksize,
 	memcpy(buf + 3 * 4, key, ksize);
 
 	srv = select_srv(db, key, ksize);
-	t = srv_send(srv, buf, bsize);
+	t = srv_send(db, srv, buf, bsize);
 	if (t <= 0) {
 		rv = -1;
 		goto exit;
 	}
 
-	reply = get_rep(srv, buf, bsize, NULL, NULL);
+	reply = get_rep(db, srv, buf, bsize, NULL, NULL);
 
 	if (reply == REP_OK) {
 		rv = 1;
diff --git a/libnmdb/nmdb.h b/libnmdb/nmdb.h
index be57129..8d0a294 100644
--- a/libnmdb/nmdb.h
+++ b/libnmdb/nmdb.h
@@ -8,17 +8,18 @@
 
 struct nmdb_srv {
 	int port;
-	int fd;
 	struct sockaddr_tipc srvsa;
 	socklen_t srvlen;
 };
 
 typedef struct nmdb_t {
+	int fd;
 	unsigned int nservers;
 	struct nmdb_srv *servers;
 } nmdb_t;
 
 nmdb_t *nmdb_init(int port);
+int nmdb_add_server(nmdb_t *db, int port);
 int nmdb_free(nmdb_t *db);
 
 ssize_t nmdb_get(nmdb_t *db, const unsigned char *key, size_t ksize,