author | Alberto Bertogli
<albertito@gmail.com> 2007-01-06 16:31:36 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2007-01-06 16:31:36 UTC |
parent | e3d0bb3e680010c48df6a6b9659a9e71a32f3a66 |
libnmdb/libnmdb.c | +106 | -27 |
libnmdb/nmdb.h | +2 | -1 |
diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c index 3358b1a..36c3155 100644 --- a/libnmdb/libnmdb.c +++ b/libnmdb/libnmdb.c @@ -18,6 +18,8 @@ #define ID_CODE 1 +/* Create a nmdb_t and set the first server to port. If port is < 0, the + * standard port is used. */ nmdb_t *nmdb_init(int port) { int fd; @@ -36,11 +38,22 @@ nmdb_t *nmdb_init(int port) } db->servers = server; - db->nservers++; + db->nservers = 1; + + /* the fd is shared among the different servers, because we use + * sendto() directly instead of connecting sockets */ + fd = socket(AF_TIPC, SOCK_RDM, 0); + if (fd < 0) { + free(db->servers); + free(db); + return NULL; + } + db->fd = fd; if (port < 0) port = SERVER_INST; + server->port = port; server->srvsa.family = AF_TIPC; server->srvsa.addrtype = TIPC_ADDR_NAMESEQ; server->srvsa.addr.nameseq.type = SERVER_TYPE; @@ -49,58 +62,101 @@ nmdb_t *nmdb_init(int port) server->srvsa.scope = TIPC_CLUSTER_SCOPE; server->srvlen = (socklen_t) sizeof(server->srvsa); - fd = socket(AF_TIPC, SOCK_RDM, 0); - if (fd < 0) { - free(db->servers); - free(db); - return NULL; - } - server->fd = fd; - return db; } +/* Compare two servers, using their ports. It is used internally to keep the + * server array sorted with qsort(). */ +static int compare_servers(const void *s1, const void *s2) +{ + struct nmdb_srv *srv1 = (struct nmdb_srv *) s1; + struct nmdb_srv *srv2 = (struct nmdb_srv *) s2; -int nmdb_free(nmdb_t *db) + if (srv1->port < srv2->port) + return -1; + else if (srv1->port == srv2->port) + return 0; + else + return 1; +} + +/* Add a server to the db connection. Requests will select which server to use + * by hashing the key. */ +int nmdb_add_server(nmdb_t *db, int port) { - int i; + struct nmdb_srv *newsrv, *newarray; - for (i = 0; i < db->nservers; i++) { - close((db->servers[i]).fd); + newarray = realloc(db->servers, + sizeof(struct nmdb_srv) * (db->nservers + 1)); + if (newarray == NULL) { + return 0; } - free(db->servers); + db->servers = newarray; + db->nservers++; + + newsrv = &(db->servers[db->nservers - 1]); + + if (port < 0) + port = SERVER_INST; + + newsrv->port = port; + newsrv->srvsa.family = AF_TIPC; + newsrv->srvsa.addrtype = TIPC_ADDR_NAMESEQ; + newsrv->srvsa.addr.nameseq.type = SERVER_TYPE; + newsrv->srvsa.addr.nameseq.lower = port; + newsrv->srvsa.addr.nameseq.upper = port; + newsrv->srvsa.scope = TIPC_CLUSTER_SCOPE; + newsrv->srvlen = (socklen_t) sizeof(newsrv->srvsa); + + /* keep the list sorted by port, so we can do a reliable selection */ + qsort(db->servers, db->nservers, sizeof(struct nmdb_srv), + compare_servers); + + return 1; +} + + +/* Frees a nmdb_t structure created with nmdb_init(). */ +int nmdb_free(nmdb_t *db) +{ + close(db->fd); + if (db->servers != NULL) + free(db->servers); free(db); return 1; } -static int srv_send(struct nmdb_srv *srv, +/* Used internally to send a buffer to the given server. */ +static int srv_send(nmdb_t *db, struct nmdb_srv *srv, const unsigned char *buf, size_t bsize) { ssize_t rv; - rv = sendto(srv->fd, buf, bsize, 0, (struct sockaddr *) &(srv->srvsa), + rv = sendto(db->fd, buf, bsize, 0, (struct sockaddr *) &(srv->srvsa), srv->srvlen); if (rv <= 0) return 0; return 1; } -static ssize_t srv_recv(struct nmdb_srv *srv, +/* Used internally to receive a buffer. */ +static ssize_t srv_recv(nmdb_t *db, struct nmdb_srv *srv, unsigned char *buf, size_t bsize) { ssize_t rv; - rv = recv(srv->fd, buf, bsize, 0); + rv = recv(db->fd, buf, bsize, 0); return rv; } -static uint32_t get_rep(struct nmdb_srv *srv, +/* Used internally to get and parse replies from the server. */ +static uint32_t get_rep(nmdb_t *db, struct nmdb_srv *srv, unsigned char *buf, size_t bsize, unsigned char **payload, size_t *psize) { ssize_t t; uint32_t id, reply; - t = srv_recv(srv, buf, bsize); + t = srv_recv(db, srv, buf, bsize); if (t < 4 + 4) { return -1; } @@ -121,10 +177,33 @@ static uint32_t get_rep(struct nmdb_srv *srv, return reply; } +/* Hash function used internally by select_srv(). See RFC 1071. */ +uint32_t checksum(const unsigned char *buf, size_t bsize) +{ + uint32_t sum = 0; + + while (bsize > 1) { + sum += * (uint16_t *) buf++; + bsize -= 2; + } + + if (bsize > 0) + sum += * (uint8_t *) buf; + + while (sum >> 16) + sum = (sum & 0xffff) + (sum >> 16); + + return ~sum; +} + +/* Used internally to select which server to use for the given key. */ static struct nmdb_srv *select_srv(nmdb_t *db, const unsigned char *key, size_t ksize) { - return &(db->servers[0]); + uint32_t n; + + n = checksum(key, ksize) % db->nservers; + return &(db->servers[n]); } @@ -166,13 +245,13 @@ static ssize_t do_get(nmdb_t *db, reqsize = 3 * 4 + ksize; srv = select_srv(db, key, ksize); - t = srv_send(srv, buf, reqsize); + t = srv_send(db, srv, buf, reqsize); if (t <= 0) { rv = -1; goto exit; } - reply = get_rep(srv, buf, bsize, &p, &psize); + reply = get_rep(db, srv, buf, bsize, &p, &psize); if (reply == REP_CACHE_MISS || reply == REP_NOTIN) { rv = 0; @@ -254,13 +333,13 @@ static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize, memcpy(p, val, vsize); srv = select_srv(db, key, ksize); - t = srv_send(srv, buf, bsize); + t = srv_send(db, srv, buf, bsize); if (t <= 0) { rv = -1; goto exit; } - reply = get_rep(srv, buf, bsize, NULL, NULL); + reply = get_rep(db, srv, buf, bsize, NULL, NULL); if (reply == REP_OK) { rv = 1; @@ -331,13 +410,13 @@ int do_del(nmdb_t *db, const unsigned char *key, size_t ksize, memcpy(buf + 3 * 4, key, ksize); srv = select_srv(db, key, ksize); - t = srv_send(srv, buf, bsize); + t = srv_send(db, srv, buf, bsize); if (t <= 0) { rv = -1; goto exit; } - reply = get_rep(srv, buf, bsize, NULL, NULL); + reply = get_rep(db, srv, buf, bsize, NULL, NULL); if (reply == REP_OK) { rv = 1; diff --git a/libnmdb/nmdb.h b/libnmdb/nmdb.h index be57129..8d0a294 100644 --- a/libnmdb/nmdb.h +++ b/libnmdb/nmdb.h @@ -8,17 +8,18 @@ struct nmdb_srv { int port; - int fd; struct sockaddr_tipc srvsa; socklen_t srvlen; }; typedef struct nmdb_t { + int fd; unsigned int nservers; struct nmdb_srv *servers; } nmdb_t; nmdb_t *nmdb_init(int port); +int nmdb_add_server(nmdb_t *db, int port); int nmdb_free(nmdb_t *db); ssize_t nmdb_get(nmdb_t *db, const unsigned char *key, size_t ksize,