author | Alberto Bertogli
<albertito@gmail.com> 2007-06-04 02:06:20 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2007-06-04 02:06:20 UTC |
parent | a418111dc0019dc4d3f82ff1879de93ee9ff1597 |
libnmdb/internal.h | +5 | -0 |
libnmdb/libnmdb.c | +68 | -40 |
libnmdb/tcp.c | +47 | -21 |
diff --git a/libnmdb/internal.h b/libnmdb/internal.h index 8df2b47..fac8d8e 100644 --- a/libnmdb/internal.h +++ b/libnmdb/internal.h @@ -11,6 +11,11 @@ * * are implemented. */ #define ID_CODE 1 +/* For a given buffer, how much into it should the generic library code write + * the message contents. */ +#define TIPC_MSG_OFFSET 0 +#define TCP_MSG_OFFSET 4 + /* Functions used internally but shared among the different files. */ int compare_servers(const void *s1, const void *s2); ssize_t srecv(int fd, unsigned char *buf, size_t count, int flags); diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c index 11fa73f..e3fd173 100644 --- a/libnmdb/libnmdb.c +++ b/libnmdb/libnmdb.c @@ -174,6 +174,19 @@ static uint32_t get_rep(struct nmdb_srv *srv, return 0; } +static int srv_get_msg_offset(struct nmdb_srv *srv) +{ + if (srv == NULL) + return 0; + + if (srv->type == TIPC_CONN) + return TIPC_MSG_OFFSET; + else if (srv->type == TCP_CONN) + return TCP_MSG_OFFSET; + else + return 0; +} + /* Hash function used internally by select_srv(). See RFC 1071. */ static uint32_t checksum(const unsigned char *buf, size_t bsize) @@ -212,6 +225,7 @@ static ssize_t do_get(nmdb_t *db, const unsigned char *key, size_t ksize, unsigned char *val, size_t vsize, int impact_db) { + int moff; ssize_t rv, t; unsigned char *buf, *p; size_t bsize, reqsize, psize = 0; @@ -224,29 +238,32 @@ static ssize_t do_get(nmdb_t *db, request = REQ_CACHE_GET; } + srv = select_srv(db, key, ksize); + moff = srv_get_msg_offset(srv); + /* Use the same buffer for the request and the reply. * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, * ksize bytes key. * Reply: 4 bytes id, 4 bytes reply code, 4 bytes vsize, * vsize bytes key. * - * We don't know vsize beforehand, but we do know TIPC's max packet is - * 66000. We malloc 70k just in case. + * We don't know vsize beforehand, but we do know our max packet size + * is 64kb. We malloc 68kb just in case. */ - bsize = 70 * 1024; + bsize = 68 * 1024; buf = malloc(bsize); if (buf == NULL) return -1; - * (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE ); - * ((uint32_t *) buf + 1) = htonl(request); - * ((uint32_t *) buf + 2) = htonl(ksize); - p = buf + 3 * 4; - memcpy(p, key, ksize); + p = buf + moff; + + * (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE ); + * ((uint32_t *) p + 1) = htonl(request); + * ((uint32_t *) p + 2) = htonl(ksize); + memcpy(p + 3 * 4, key, ksize); reqsize = 3 * 4 + ksize; - srv = select_srv(db, key, ksize); - t = srv_send(srv, buf, reqsize); + t = srv_send(srv, buf, moff + reqsize); if (t <= 0) { rv = -1; goto exit; @@ -298,6 +315,7 @@ static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *val, size_t vsize, int impact_db, int async) { + int moff; ssize_t rv, t; unsigned char *buf, *p; size_t bsize; @@ -313,27 +331,28 @@ static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize, request = REQ_CACHE_SET; } + srv = select_srv(db, key, ksize); + moff = srv_get_msg_offset(srv); /* Use the same buffer for the request and the reply. * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, 4 * bytes vsize, ksize bytes key, vsize bytes val. * Reply: 4 bytes id, 4 bytes reply code. */ - bsize = 4 + 4 + 4 + 4 + ksize + vsize; + bsize = moff + 4 + 4 + 4 + 4 + ksize + vsize; buf = malloc(bsize); if (buf == NULL) return -1; - * (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE ); - * ((uint32_t *) buf + 1) = htonl(request); - * ((uint32_t *) buf + 2) = htonl(ksize); - * ((uint32_t *) buf + 3) = htonl(vsize); - p = buf + 4 * 4; - memcpy(p, key, ksize); - p += ksize; - memcpy(p, val, vsize); + p = buf + moff; + + * (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE ); + * ((uint32_t *) p + 1) = htonl(request); + * ((uint32_t *) p + 2) = htonl(ksize); + * ((uint32_t *) p + 3) = htonl(vsize); + memcpy(p + 4 * 4, key, ksize); + memcpy(p + 4 * 4 + ksize, val, vsize); - srv = select_srv(db, key, ksize); t = srv_send(srv, buf, bsize); if (t <= 0) { rv = -1; @@ -379,8 +398,9 @@ int nmdb_cache_set(nmdb_t *db, const unsigned char *key, size_t ksize, static int do_del(nmdb_t *db, const unsigned char *key, size_t ksize, int impact_db, int async) { + int moff; ssize_t rv, t; - unsigned char *buf; + unsigned char *buf, *p; size_t bsize; uint32_t request, reply; struct nmdb_srv *srv; @@ -394,23 +414,26 @@ static int do_del(nmdb_t *db, const unsigned char *key, size_t ksize, request = REQ_CACHE_DEL; } + srv = select_srv(db, key, ksize); + moff = srv_get_msg_offset(srv); /* Use the same buffer for the request and the reply. * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, * ksize bytes key. * Reply: 4 bytes id, 4 bytes reply code. */ - bsize = 8 + 4 + ksize; + bsize = moff + 8 + 4 + ksize; buf = malloc(bsize); if (buf == NULL) return -1; - * (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE ); - * ((uint32_t *) buf + 1) = htonl(request); - * ((uint32_t *) buf + 2) = htonl(ksize); - memcpy(buf + 3 * 4, key, ksize); + p = buf + moff; + + * (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE ); + * ((uint32_t *) p + 1) = htonl(request); + * ((uint32_t *) p + 2) = htonl(ksize); + memcpy(p + 3 * 4, key, ksize); - srv = select_srv(db, key, ksize); t = srv_send(srv, buf, bsize); if (t <= 0) { rv = -1; @@ -457,8 +480,9 @@ static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *newval, size_t nvsize, int impact_db) { + int moff; ssize_t rv, t; - unsigned char *buf, *p; + unsigned char *buf, *p, *q; size_t bsize; uint32_t request, reply; struct nmdb_srv *srv; @@ -467,6 +491,8 @@ static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize, if (impact_db) request = REQ_CAS; + srv = select_srv(db, key, ksize); + moff = srv_get_msg_offset(srv); /* Use the same buffer for the request and the reply. * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, 4 @@ -474,22 +500,24 @@ static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize, * ovsize bytes oldval, nvsize bytes newval. * Reply: 4 bytes id, 4 bytes reply code. */ - bsize = 4 + 4 + 4 + 4 + 4 + ksize + ovsize + nvsize; + bsize = moff + 4 + 4 + 4 + 4 + 4 + ksize + ovsize + nvsize; buf = malloc(bsize); if (buf == NULL) return -1; - * (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE ); - * ((uint32_t *) buf + 1) = htonl(request); - * ((uint32_t *) buf + 2) = htonl(ksize); - * ((uint32_t *) buf + 3) = htonl(ovsize); - * ((uint32_t *) buf + 4) = htonl(nvsize); - p = buf + 5 * 4; - memcpy(p, key, ksize); - p += ksize; - memcpy(p, oldval, ovsize); - p += ovsize; - memcpy(p, newval, nvsize); + p = buf + moff; + + * (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE ); + * ((uint32_t *) p + 1) = htonl(request); + * ((uint32_t *) p + 2) = htonl(ksize); + * ((uint32_t *) p + 3) = htonl(ovsize); + * ((uint32_t *) p + 4) = htonl(nvsize); + q = p + 5 * 4; + memcpy(q, key, ksize); + q += ksize; + memcpy(q, oldval, ovsize); + q += ovsize; + memcpy(q, newval, nvsize); srv = select_srv(db, key, ksize); t = srv_send(srv, buf, bsize); diff --git a/libnmdb/tcp.c b/libnmdb/tcp.c index 1f03f1f..6e701ed 100644 --- a/libnmdb/tcp.c +++ b/libnmdb/tcp.c @@ -103,15 +103,13 @@ int nmdb_add_tcp_server(nmdb_t *db, const char *addr, int port) } int tcp_srv_send(struct nmdb_srv *srv, - const unsigned char *buf, size_t bsize) + unsigned char *buf, size_t bsize) { ssize_t rv; uint32_t len; - len = htonl(bsize + 4); - rv = ssend(srv->fd, (unsigned char *) &len, 4, 0); - if (rv != 4) - return 0; + len = htonl(bsize); + memcpy(buf, (const void *) &len, 4); rv = ssend(srv->fd, buf, bsize, 0); if (rv != bsize) @@ -119,30 +117,58 @@ int tcp_srv_send(struct nmdb_srv *srv, return 1; } +static ssize_t recv_msg(int fd, unsigned char *buf, size_t bsize) +{ + ssize_t rv, t; + uint32_t msgsize; + + rv = recv(fd, buf, bsize, 0); + if (rv <= 0) + return rv; + + if (rv < 4) { + t = srecv(fd, buf + rv, 4 - rv, 0); + if (t <= 0) { + return t; + } + + rv = rv + t; + } + + msgsize = * ((uint32_t *) buf); + msgsize = ntohl(msgsize); + + if (msgsize > bsize) + return -1; + + if (rv < msgsize) { + t = srecv(fd, buf + rv, msgsize - rv, 0); + if (t <= 0) { + return t; + } + + rv = rv + t; + } + + return rv; +} + + /* Used internally to get and parse replies from the server. */ uint32_t tcp_get_rep(struct nmdb_srv *srv, unsigned char *buf, size_t bsize, unsigned char **payload, size_t *psize) { ssize_t rv; - uint32_t id, reply, msgsize; + uint32_t id, reply; - rv = srecv(srv->fd, (unsigned char *) &msgsize, 4, 0); - if (rv != 4) + rv = recv_msg(srv->fd, buf, bsize); + if (rv <= 0) return -1; - msgsize = ntohl(msgsize); - if (bsize < msgsize) - return -1; - - rv = srecv(srv->fd, buf, msgsize - 4, 0); - if (rv != msgsize - 4) { - return -1; - } - - id = * (uint32_t *) buf; + id = * ((uint32_t *) buf + 1); id = ntohl(id); - reply = * ((uint32_t *) buf + 1); + reply = * ((uint32_t *) buf + 2); reply = ntohl(reply); if (id != ID_CODE) { @@ -150,8 +176,8 @@ uint32_t tcp_get_rep(struct nmdb_srv *srv, } if (payload != NULL) { - *payload = buf + 4 + 4; - *psize = rv - 4 - 4; + *payload = buf + 4 + 4 + 4; + *psize = rv - 4 - 4 - 4; } return reply; }