author | Alberto Bertogli
<albertito@gmail.com> 2007-06-01 04:43:23 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2007-06-01 04:43:23 UTC |
parent | 042a0dc669a4cd9a0e911cb1b278d858923c916e |
libnmdb/libnmdb.c | +233 | -70 |
libnmdb/nmdb.h | +18 | -6 |
diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c index 6e482e4..51eaed0 100644 --- a/libnmdb/libnmdb.c +++ b/libnmdb/libnmdb.c @@ -7,6 +7,7 @@ #include <arpa/inet.h> /* htonls() and friends */ #include <string.h> /* memcpy() */ #include <unistd.h> /* close() */ +#include <netinet/tcp.h> /* TCP stuff */ #include <stdio.h> @@ -17,50 +18,72 @@ * are implemented. */ #define ID_CODE 1 +/* Different connection types. Used internally to differentiate between TIPC + * and TCP connections in struct nmdb_srv. */ +#define TIPC_CONN 1 +#define TCP_CONN 2 -/* Create a nmdb_t and set the first server to port. If port is < 0, the - * standard port is used. */ -nmdb_t *nmdb_init(int port) + +/* Like recv() but either fails, or returns a complete read; if we return less + * than count is because EOF was reached */ +ssize_t srecv(int fd, unsigned char *buf, size_t count, int flags) { - int fd; - nmdb_t *db; - struct nmdb_srv *server; + ssize_t rv, c; - db = malloc(sizeof(nmdb_t)); - if (db == NULL) { - return NULL; + c = 0; + + while (c < count) { + rv = recv(fd, buf + c, count - c, flags); + + if (rv == count) + return count; + else if (rv < 0) + return rv; + else if (rv == 0) + return c; + + c += rv; } + return count; +} - server = malloc(sizeof(struct nmdb_srv)); - if (server == NULL) { - free(db); - return NULL; +/* Like srecv() but for send() */ +ssize_t ssend(int fd, const unsigned char *buf, size_t count, int flags) +{ + ssize_t rv, c; + + c = 0; + + while (c < count) { + rv = send(fd, buf + c, count - c, flags); + + if (rv == count) + return count; + else if (rv < 0) + return rv; + else if (rv == 0) + return c; + + c += rv; } + return count; +} - db->servers = server; - db->nservers = 1; - /* the fd is shared among the different servers, because we use - * sendto() directly instead of connecting sockets */ - fd = socket(AF_TIPC, SOCK_RDM, 0); - if (fd < 0) { - free(db->servers); - free(db); + +/* Create a nmdb_t and set the first server to port. If port is < 0, the + * standard port is used. */ +nmdb_t *nmdb_init() +{ + nmdb_t *db; + + db = malloc(sizeof(nmdb_t)); + if (db == NULL) { return NULL; } - db->fd = fd; - if (port < 0) - port = SERVER_INST; - - server->port = port; - server->srvsa.family = AF_TIPC; - server->srvsa.addrtype = TIPC_ADDR_NAMESEQ; - server->srvsa.addr.nameseq.type = SERVER_TYPE; - server->srvsa.addr.nameseq.lower = port; - server->srvsa.addr.nameseq.upper = port; - server->srvsa.scope = TIPC_CLUSTER_SCOPE; - server->srvlen = (socklen_t) sizeof(server->srvsa); + db->servers = NULL; + db->nservers = 0; return db; } @@ -72,18 +95,19 @@ static int compare_servers(const void *s1, const void *s2) struct nmdb_srv *srv1 = (struct nmdb_srv *) s1; struct nmdb_srv *srv2 = (struct nmdb_srv *) s2; - if (srv1->port < srv2->port) + if (srv1->id < srv2->id) return -1; - else if (srv1->port == srv2->port) + else if (srv1->id == srv2->id) return 0; else return 1; } -/* Add a server to the db connection. Requests will select which server to use - * by hashing the key. */ -int nmdb_add_server(nmdb_t *db, int port) +/* Add a TIPC server to the db connection. Requests will select which server + * to use by hashing the key. */ +int nmdb_add_tipc_server(nmdb_t *db, int port) { + int fd; struct nmdb_srv *newsrv, *newarray; newarray = realloc(db->servers, @@ -96,17 +120,81 @@ int nmdb_add_server(nmdb_t *db, int port) newsrv = &(db->servers[db->nservers - 1]); + fd = socket(AF_TIPC, SOCK_RDM, 0); + if (fd < 0) { + return 0; + } + newsrv->fd = fd; + if (port < 0) port = SERVER_INST; - newsrv->port = port; - newsrv->srvsa.family = AF_TIPC; - newsrv->srvsa.addrtype = TIPC_ADDR_NAMESEQ; - newsrv->srvsa.addr.nameseq.type = SERVER_TYPE; - newsrv->srvsa.addr.nameseq.lower = port; - newsrv->srvsa.addr.nameseq.upper = port; - newsrv->srvsa.scope = TIPC_CLUSTER_SCOPE; - newsrv->srvlen = (socklen_t) sizeof(newsrv->srvsa); + newsrv->info.tipc.port = port; + newsrv->info.tipc.srvsa.family = AF_TIPC; + newsrv->info.tipc.srvsa.addrtype = TIPC_ADDR_NAMESEQ; + newsrv->info.tipc.srvsa.addr.nameseq.type = SERVER_TYPE; + newsrv->info.tipc.srvsa.addr.nameseq.lower = port; + newsrv->info.tipc.srvsa.addr.nameseq.upper = port; + newsrv->info.tipc.srvsa.scope = TIPC_CLUSTER_SCOPE; + newsrv->info.tipc.srvlen = (socklen_t) sizeof(newsrv->info.tipc.srvsa); + + /* we use the port as an id for sorting */ + newsrv->id = port; + + newsrv->type = TIPC_CONN; + + /* keep the list sorted by port, so we can do a reliable selection */ + qsort(db->servers, db->nservers, sizeof(struct nmdb_srv), + compare_servers); + + return 1; +} + + +/* Same as nmdb_add_tipc_server() but for TCP connections. */ +int nmdb_add_tcp_server(nmdb_t *db, const char *addr, int port) +{ + int rv, fd; + struct nmdb_srv *newsrv, *newarray; + + newarray = realloc(db->servers, + sizeof(struct nmdb_srv) * (db->nservers + 1)); + if (newarray == NULL) { + return 0; + } + db->servers = newarray; + db->nservers++; + + newsrv = &(db->servers[db->nservers - 1]); + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) + return 0; + + newsrv->fd = fd; + newsrv->info.tcp.srvsa.sin_family = AF_INET; + newsrv->info.tcp.srvsa.sin_port = htons(port); + rv = inet_pton(AF_INET, addr, &(newsrv->info.tcp.srvsa.sin_addr)); + if (rv <= 0) + return 0; + + rv = connect(fd, (struct sockaddr *) &(newsrv->info.tcp.srvsa), + sizeof(newsrv->info.tcp.srvsa)); + if (rv < 0) + return 0; + + /* Disable Nagle algorithm because we often send small packets. Huge + * gain in performance. */ + rv = 1; + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &rv, sizeof(rv)) < 0 ) { + close(fd); + return -1; + } + + /* FIXME: find a decent ID to use */ + newsrv->id = port; + + newsrv->type = TCP_CONN; /* keep the list sorted by port, so we can do a reliable selection */ qsort(db->servers, db->nservers, sizeof(struct nmdb_srv), @@ -119,45 +207,70 @@ int nmdb_add_server(nmdb_t *db, int port) /* Frees a nmdb_t structure created with nmdb_init(). */ int nmdb_free(nmdb_t *db) { - close(db->fd); - if (db->servers != NULL) + if (db->servers != NULL) { + int i; + for (i = 0; i < db->nservers; i++) + close(db->servers[i].fd); free(db->servers); + } free(db); return 1; } -/* Used internally to send a buffer to the given server. */ -static int srv_send(nmdb_t *db, struct nmdb_srv *srv, +static int tipc_srv_send(struct nmdb_srv *srv, const unsigned char *buf, size_t bsize) { ssize_t rv; - rv = sendto(db->fd, buf, bsize, 0, (struct sockaddr *) &(srv->srvsa), - srv->srvlen); + rv = sendto(srv->fd, buf, bsize, 0, + (struct sockaddr *) &(srv->info.tipc.srvsa), + srv->info.tipc.srvlen); if (rv <= 0) return 0; return 1; } -/* Used internally to receive a buffer. */ -static ssize_t srv_recv(nmdb_t *db, struct nmdb_srv *srv, - unsigned char *buf, size_t bsize) +static int tcp_srv_send(struct nmdb_srv *srv, + const unsigned char *buf, size_t bsize) { ssize_t rv; - rv = recv(db->fd, buf, bsize, 0); - return rv; + uint32_t len; + + len = htonl(bsize + 4); + rv = ssend(srv->fd, (unsigned char *) &len, 4, 0); + if (rv != 4) + return 0; + + rv = ssend(srv->fd, buf, bsize, 0); + if (rv != bsize) + return 0; + return 1; +} + +/* Used internally to send a buffer to the given server. Calls the appropriate + * sender according to the server protocol. */ +static int srv_send(struct nmdb_srv *srv, + const unsigned char *buf, size_t bsize) +{ + if (srv->type == TIPC_CONN) + return tipc_srv_send(srv, buf, bsize); + else if (srv->type == TCP_CONN) + return tcp_srv_send(srv, buf, bsize); + else + return 0; } + /* Used internally to get and parse replies from the server. */ -static uint32_t get_rep(nmdb_t *db, struct nmdb_srv *srv, +static uint32_t tipc_get_rep(struct nmdb_srv *srv, unsigned char *buf, size_t bsize, unsigned char **payload, size_t *psize) { - ssize_t t; + ssize_t rv; uint32_t id, reply; - t = srv_recv(db, srv, buf, bsize); - if (t < 4 + 4) { + rv = recv(srv->fd, buf, bsize, 0); + if (rv < 4 + 4) { return -1; } @@ -172,11 +285,61 @@ static uint32_t get_rep(nmdb_t *db, struct nmdb_srv *srv, if (payload != NULL) { *payload = buf + 4 + 4; - *psize = t - 4 - 4; + *psize = rv - 4 - 4; } return reply; } +/* Used internally to get and parse replies from the server. */ +static uint32_t tcp_get_rep(struct nmdb_srv *srv, + unsigned char *buf, size_t bsize, + unsigned char **payload, size_t *psize) +{ + ssize_t rv; + uint32_t id, reply, msgsize; + + rv = srecv(srv->fd, (unsigned char *) &msgsize, 4, 0); + if (rv != 4) + return -1; + + msgsize = ntohl(msgsize); + if (bsize < msgsize) + return -1; + + rv = srecv(srv->fd, buf, msgsize - 4, 0); + if (rv != msgsize - 4) { + return -1; + } + + id = * (uint32_t *) buf; + id = ntohl(id); + reply = * ((uint32_t *) buf + 1); + reply = ntohl(reply); + + if (id != ID_CODE) { + return -1; + } + + if (payload != NULL) { + *payload = buf + 4 + 4; + *psize = rv - 4 - 4; + } + return reply; +} + +static uint32_t get_rep(struct nmdb_srv *srv, + unsigned char *buf, size_t bsize, + unsigned char **payload, size_t *psize) +{ + if (srv->type == TIPC_CONN) + return tipc_get_rep(srv, buf, bsize, payload, psize); + else if (srv->type == TCP_CONN) + return tcp_get_rep(srv, buf, bsize, payload, psize); + else + return 0; +} + + /* Hash function used internally by select_srv(). See RFC 1071. */ static uint32_t checksum(const unsigned char *buf, size_t bsize) { @@ -245,13 +408,13 @@ static ssize_t do_get(nmdb_t *db, reqsize = 3 * 4 + ksize; srv = select_srv(db, key, ksize); - t = srv_send(db, srv, buf, reqsize); + t = srv_send(srv, buf, reqsize); if (t <= 0) { rv = -1; goto exit; } - reply = get_rep(db, srv, buf, bsize, &p, &psize); + reply = get_rep(srv, buf, bsize, &p, &psize); if (reply == REP_CACHE_MISS || reply == REP_NOTIN) { rv = 0; @@ -333,13 +496,13 @@ static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize, memcpy(p, val, vsize); srv = select_srv(db, key, ksize); - t = srv_send(db, srv, buf, bsize); + t = srv_send(srv, buf, bsize); if (t <= 0) { rv = -1; goto exit; } - reply = get_rep(db, srv, buf, bsize, NULL, NULL); + reply = get_rep(srv, buf, bsize, NULL, NULL); if (reply == REP_OK) { rv = 1; @@ -410,13 +573,13 @@ static int do_del(nmdb_t *db, const unsigned char *key, size_t ksize, memcpy(buf + 3 * 4, key, ksize); srv = select_srv(db, key, ksize); - t = srv_send(db, srv, buf, bsize); + t = srv_send(srv, buf, bsize); if (t <= 0) { rv = -1; goto exit; } - reply = get_rep(db, srv, buf, bsize, NULL, NULL); + reply = get_rep(srv, buf, bsize, NULL, NULL); if (reply == REP_OK) { rv = 1; @@ -491,13 +654,13 @@ static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize, memcpy(p, newval, nvsize); srv = select_srv(db, key, ksize); - t = srv_send(db, srv, buf, bsize); + t = srv_send(srv, buf, bsize); if (t <= 0) { rv = -1; goto exit; } - reply = get_rep(db, srv, buf, bsize, NULL, NULL); + reply = get_rep(srv, buf, bsize, NULL, NULL); if (reply == REP_OK) { rv = 2; diff --git a/libnmdb/nmdb.h b/libnmdb/nmdb.h index 7fc0994..9906033 100644 --- a/libnmdb/nmdb.h +++ b/libnmdb/nmdb.h @@ -5,21 +5,33 @@ #include <sys/types.h> /* socket defines */ #include <sys/socket.h> /* socklen_t */ #include <linux/tipc.h> /* struct sockaddr_tipc */ +#include <netinet/in.h> /* struct sockaddr_in */ struct nmdb_srv { - int port; - struct sockaddr_tipc srvsa; - socklen_t srvlen; + int fd; + int type; + union { + struct { + unsigned int port; + struct sockaddr_tipc srvsa; + socklen_t srvlen; + } tipc; + struct { + struct sockaddr_in srvsa; + socklen_t srvlen; + } tcp; + } info; + unsigned long id; }; typedef struct nmdb_t { - int fd; unsigned int nservers; struct nmdb_srv *servers; } nmdb_t; -nmdb_t *nmdb_init(int port); -int nmdb_add_server(nmdb_t *db, int port); +nmdb_t *nmdb_init(); +int nmdb_add_tipc_server(nmdb_t *db, int port); +int nmdb_add_tcp_server(nmdb_t *db, const char *addr, int port); int nmdb_free(nmdb_t *db); ssize_t nmdb_get(nmdb_t *db, const unsigned char *key, size_t ksize,