author | Alberto Bertogli
<albertito@gmail.com> 2007-06-01 05:49:32 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2007-06-01 05:49:32 UTC |
parent | 67f42aa3cda28b810f23cdbd55e990300634254c |
nmdb/Makefile | +1 | -1 |
nmdb/req.h | +2 | -1 |
nmdb/tcp.c | +26 | -430 |
nmdb/tipc.c | +14 | -433 |
diff --git a/nmdb/Makefile b/nmdb/Makefile index 6b26d6f..a37dac3 100644 --- a/nmdb/Makefile +++ b/nmdb/Makefile @@ -18,7 +18,7 @@ endif PREFIX=/usr/local -OBJS = be-qdbm.o cache.o db.o queue.o net.o tcp.o tipc.o main.o +OBJS = be-qdbm.o cache.o db.o queue.o net.o parse.o tcp.o tipc.o main.o default: all diff --git a/nmdb/req.h b/nmdb/req.h index f8c7810..4c52d59 100644 --- a/nmdb/req.h +++ b/nmdb/req.h @@ -23,10 +23,11 @@ struct req_info { /* operation information */ uint32_t id; uint32_t cmd; - unsigned char *payload; + const unsigned char *payload; size_t psize; /* operations */ + void (*mini_reply)(struct req_info *req, uint32_t reply); void (*reply_err)(struct req_info *req, uint32_t reply); void (*reply_get)(struct req_info *req, uint32_t reply, unsigned char *val, size_t vsize); diff --git a/nmdb/tcp.c b/nmdb/tcp.c index 622611a..cde51f1 100644 --- a/nmdb/tcp.c +++ b/nmdb/tcp.c @@ -6,7 +6,6 @@ #include <stdint.h> /* uint32_t and friends */ #include <arpa/inet.h> /* htonls() and friends */ #include <netinet/in.h> /* INET stuff */ -//#include <netinet/ip.h> /* IP stuff */ #include <netinet/tcp.h> /* TCP stuff */ #include <string.h> /* memcpy() */ #include <unistd.h> /* fcntl() */ @@ -21,9 +20,10 @@ typedef unsigned char u_char; #include "tcp.h" #include "common.h" -#include "queue.h" #include "net-const.h" #include "req.h" +#include "parse.h" + #define printf(...) do { } while (0) @@ -42,19 +42,16 @@ struct tcp_socket { }; static void tcp_recv(int fd, short event, void *arg); -static void parse_msg(struct tcp_socket *tcpsock, struct req_info *req, +static void process_buf(struct tcp_socket *tcpsock, struct req_info *req, unsigned char *buf, size_t len); -static void parse_get(struct req_info *req, int impact_db); -static void parse_set(struct req_info *req, int impact_db, int async); -static void parse_del(struct req_info *req, int impact_db, int async); -static void parse_cas(struct req_info *req, int impact_db); -void tcp_reply_err(struct req_info *req, uint32_t reply); -void tcp_reply_get(struct req_info *req, uint32_t reply, +static void tcp_mini_reply(struct req_info *req, uint32_t reply); +static void tcp_reply_err(struct req_info *req, uint32_t reply); +static void tcp_reply_get(struct req_info *req, uint32_t reply, unsigned char *val, size_t vsize); -void tcp_reply_set(struct req_info *req, uint32_t reply); -void tcp_reply_del(struct req_info *req, uint32_t reply); -void tcp_reply_cas(struct req_info *req, uint32_t reply); +static void tcp_reply_set(struct req_info *req, uint32_t reply); +static void tcp_reply_del(struct req_info *req, uint32_t reply); +static void tcp_reply_cas(struct req_info *req, uint32_t reply); /* @@ -111,7 +108,7 @@ static int rep_send(const struct req_info *req, const unsigned char *buf, /* Send small replies, consisting in only a value. */ -static void mini_reply(struct req_info *req, uint32_t reply) +void tcp_mini_reply(struct req_info *req, uint32_t reply) { /* We use a mini buffer to speedup the small replies, to avoid the * malloc() overhead. */ @@ -128,99 +125,6 @@ static void mini_reply(struct req_info *req, uint32_t reply) } -/* Create a queue entry structure based on the parameters passed. Memory - * allocated here will be free()'d in queue_entry_free(). It's not the - * cleanest way, but the alternatives are even messier. */ -static struct queue_entry *make_queue_long_entry(struct req_info *req, - uint32_t operation, const unsigned char *key, size_t ksize, - const unsigned char *val, size_t vsize, - const unsigned char *newval, size_t nvsize) -{ - struct queue_entry *e; - unsigned char *kcopy, *vcopy, *nvcopy; - - e = queue_entry_create(); - if (e == NULL) { - return NULL; - } - - kcopy = NULL; - if (key != NULL) { - kcopy = malloc(ksize); - if (kcopy == NULL) { - queue_entry_free(e); - return NULL; - } - memcpy(kcopy, key, ksize); - } - - vcopy = NULL; - if (val != NULL) { - vcopy = malloc(vsize); - if (vcopy == NULL) { - queue_entry_free(e); - if (kcopy != NULL) - free(kcopy); - return NULL; - } - memcpy(vcopy, val, vsize); - } - - nvcopy = NULL; - if (newval != NULL) { - nvcopy = malloc(nvsize); - if (nvcopy == NULL) { - queue_entry_free(e); - if (kcopy != NULL) - free(kcopy); - if (vcopy != NULL) - free(vcopy); - return NULL; - } - memcpy(nvcopy, newval, nvsize); - } - - e->operation = operation; - e->key = kcopy; - e->ksize = ksize; - e->val = vcopy; - e->vsize = vsize; - e->newval = nvcopy; - e->nvsize = nvsize; - - /* Create a copy of req, including clisa */ - e->req = malloc(sizeof(struct req_info)); - if (e->req == NULL) { - queue_entry_free(e); - return NULL; - } - memcpy(e->req, req, sizeof(struct req_info)); - - e->req->clisa = malloc(sizeof(struct sockaddr_in)); - if (e->req->clisa == NULL) { - queue_entry_free(e); - return NULL; - } - memcpy(e->req->clisa, req->clisa, sizeof(struct sockaddr_in)); - - /* clear out unused fields */ - e->req->payload = NULL; - e->req->psize = 0; - - return e; -} - -/* Like make_queue_long_entry() but with few parameters because most actions - * do not need newval. */ -static struct queue_entry *make_queue_entry(struct req_info *req, - uint32_t operation, const unsigned char *key, size_t ksize, - const unsigned char *val, size_t vsize) -{ - return make_queue_long_entry(req, operation, key, ksize, val, vsize, - NULL, 0); -} - - void tcp_reply_err(struct req_info *req, uint32_t reply) { rep_send_error(req, reply); @@ -231,7 +135,7 @@ void tcp_reply_get(struct req_info *req, uint32_t reply, { if (val == NULL) { /* miss */ - mini_reply(req, reply); + tcp_mini_reply(req, reply); } else { unsigned char *buf; size_t bsize; @@ -269,18 +173,18 @@ void tcp_reply_get(struct req_info *req, uint32_t reply, void tcp_reply_set(struct req_info *req, uint32_t reply) { - mini_reply(req, reply); + tcp_mini_reply(req, reply); } void tcp_reply_del(struct req_info *req, uint32_t reply) { - mini_reply(req, reply); + tcp_mini_reply(req, reply); } void tcp_reply_cas(struct req_info *req, uint32_t reply) { - mini_reply(req, reply); + tcp_mini_reply(req, reply); } @@ -411,7 +315,7 @@ static void tcp_recv(int fd, short event, void *arg) /* New incoming message */ /* Allocate a little bit more over the max. message size, - * which is 64k; it will be freed by parse_msg(). */ + * which is 64k; it will be freed by process_buf(). */ bsize = 68 * 1024; buf = malloc(bsize); if (buf == NULL) { @@ -430,7 +334,7 @@ static void tcp_recv(int fd, short event, void *arg) goto error_exit; } - /* parse_msg() will take care of freeing this when the time + /* process_buf() will take care of freeing this when the time * comes */ req = malloc(sizeof(struct req_info)); if (req == NULL) { @@ -442,13 +346,14 @@ static void tcp_recv(int fd, short event, void *arg) req->type = REQTYPE_TCP; req->clisa = (struct sockaddr *) &tcpsock->clisa; req->clilen = tcpsock->clilen; + req->mini_reply = tcp_mini_reply; req->reply_err = tcp_reply_err; req->reply_get = tcp_reply_get; req->reply_set = tcp_reply_set; req->reply_del = tcp_reply_del; req->reply_cas = tcp_reply_cas; - parse_msg(tcpsock, req, buf, rv); + process_buf(tcpsock, req, buf, rv); } else { /* We already got a partial message, complete it. */ @@ -465,7 +370,7 @@ static void tcp_recv(int fd, short event, void *arg) tcpsock->len += rv; - parse_msg(tcpsock, tcpsock->req, tcpsock->buf, tcpsock->len); + process_buf(tcpsock, tcpsock->req, tcpsock->buf, tcpsock->len); } return; @@ -478,14 +383,11 @@ error_exit: } -/* Main message parsing and dissecting */ -static void parse_msg(struct tcp_socket *tcpsock, struct req_info *req, +/* Main message unwrapping */ +static void process_buf(struct tcp_socket *tcpsock, struct req_info *req, unsigned char *buf, size_t len) { - uint32_t hdr, id, cmd, totaltoget = 0; - uint8_t ver; - size_t psize; - unsigned char *payload; + uint32_t totaltoget = 0; printf("parse l:%tu tl:%tu tb:%p ts:%tu \n", len, tcpsock->len, tcpsock->buf, tcpsock->pktsize); @@ -521,74 +423,19 @@ static void parse_msg(struct tcp_socket *tcpsock, struct req_info *req, tcpsock->len = len; tcpsock->pktsize = totaltoget; } - return; } + printf("parsing\n"); /* The buffer is complete, parse it as usual. */ - /* The header is: - * 4 bytes Total message length - * 4 bytes Version + ID - * 4 bytes Command - * Variable Payload - */ - - hdr = * ((uint32_t *) buf + 1); - hdr = htonl(hdr); - - /* FIXME: little endian-only */ - ver = (hdr & 0xF0000000) >> 28; - id = hdr & 0x0FFFFFFF; - req->id = id; - - cmd = ntohl(* ((uint32_t *) buf + 2)); - - if (ver != PROTO_VER) { - stats.net_version_mismatch++; - rep_send_error(req, ERR_VER); + if (parse_message(req, buf + 4, len - 4)) { goto exit; + } else { + goto error_exit; } - /* We define payload as the stuff after buf. But be careful because - * there might be none (if len == 1). Doing the pointer arithmetic - * isn't problematic, but accessing the payload should be done only if - * we know we have enough data. */ - payload = buf + 12; - psize = len - 12; - - /* Store the id encoded in network byte order, so that we don't have - * to calculate it at send time. */ - req->id = htonl(id); - req->cmd = cmd; - req->payload = payload; - req->psize = psize; - - if (cmd == REQ_CACHE_GET) - parse_get(req, 0); - else if (cmd == REQ_CACHE_SET) - parse_set(req, 0, 0); - else if (cmd == REQ_CACHE_DEL) - parse_del(req, 0, 0); - else if (cmd == REQ_GET) - parse_get(req, 1); - else if (cmd == REQ_SET_SYNC) - parse_set(req, 1, 0); - else if (cmd == REQ_DEL_SYNC) - parse_del(req, 1, 0); - else if (cmd == REQ_SET_ASYNC) - parse_set(req, 1, 1); - else if (cmd == REQ_DEL_ASYNC) - parse_del(req, 1, 1); - else if (cmd == REQ_CACHE_CAS) - parse_cas(req, 0); - else if (cmd == REQ_CAS) - parse_cas(req, 1); - else { - stats.net_unk_req++; - rep_send_error(req, ERR_UNKREQ); - } exit: printf("pm exit\n"); @@ -607,7 +454,6 @@ exit: return; - error_exit: printf("pm error\n"); printf("t:%p b:%p\n", tcpsock->buf, buf); @@ -630,253 +476,3 @@ error_exit: } -static void parse_get(struct req_info *req, int impact_db) -{ - int hit; - unsigned char *key; - uint32_t ksize; - unsigned char *val = NULL; - size_t vsize = 0; - - ksize = * (uint32_t *) req->payload; - ksize = ntohl(ksize); - if (req->psize < ksize) { - stats.net_broken_req++; - rep_send_error(req, ERR_BROKEN); - return; - } - - key = req->payload + sizeof(uint32_t); - - hit = cache_get(cache_table, key, ksize, &val, &vsize); - - if (!hit && !impact_db) { - mini_reply(req, REP_CACHE_MISS); - return; - } else if (!hit && impact_db) { - struct queue_entry *e; - e = make_queue_entry(req, REQ_GET, key, ksize, NULL, 0); - if (e == NULL) { - rep_send_error(req, ERR_MEM); - return; - } - queue_lock(op_queue); - queue_put(op_queue, e); - queue_unlock(op_queue); - queue_signal(op_queue); - return; - } else { - tcp_reply_get(req, REP_CACHE_HIT, val, vsize); - return; - } -} - - -static void parse_set(struct req_info *req, int impact_db, int async) -{ - int rv; - unsigned char *key, *val; - uint32_t ksize, vsize; - const int max = 65536; - - /* Request format: - * 4 ksize - * 4 vsize - * ksize key - * vsize val - */ - ksize = * (uint32_t *) req->payload; - ksize = ntohl(ksize); - vsize = * ( ((uint32_t *) req->payload) + 1), - vsize = ntohl(vsize); - - /* Sanity check on sizes: - * - ksize and vsize must both be < req->psize - * - ksize and vsize must both be < 2^16 = 64k - * - ksize + vsize < 2^16 = 64k - */ - if ( (req->psize < ksize) || (req->psize < vsize) || - (ksize > max) || (vsize > max) || - ( (ksize + vsize) > max) ) { - stats.net_broken_req++; - rep_send_error(req, ERR_BROKEN); - return; - } - - key = req->payload + sizeof(uint32_t) * 2; - val = key + ksize; - - rv = cache_set(cache_table, key, ksize, val, vsize); - if (!rv) { - rep_send_error(req, ERR_MEM); - return; - } - - if (impact_db) { - struct queue_entry *e; - uint32_t request; - - request = REQ_SET_SYNC; - if (async) - request = REQ_SET_ASYNC; - - e = make_queue_entry(req, request, key, ksize, val, vsize); - if (e == NULL) { - rep_send_error(req, ERR_MEM); - return; - } - queue_lock(op_queue); - queue_put(op_queue, e); - queue_unlock(op_queue); - - if (async) { - mini_reply(req, REP_OK); - } else { - /* Signal the DB thread it has work only if it's a - * synchronous operation, asynchronous don't mind - * waiting. It does have a measurable impact on - * performance (2083847usec vs 2804973usec for sets on - * "test2d 100000 10 10"). */ - queue_signal(op_queue); - } - return; - } else { - mini_reply(req, REP_OK); - } - - return; -} - - -static void parse_del(struct req_info *req, int impact_db, int async) -{ - int hit; - unsigned char *key; - uint32_t ksize; - - ksize = * (uint32_t *) req->payload; - ksize = ntohl(ksize); - if (req->psize < ksize) { - stats.net_broken_req++; - rep_send_error(req, ERR_BROKEN); - return; - } - - key = req->payload + sizeof(uint32_t); - - hit = cache_del(cache_table, key, ksize); - - if (!impact_db && hit) { - mini_reply(req, REP_OK); - } else if (!impact_db && !hit) { - mini_reply(req, REP_NOTIN); - } else if (impact_db) { - struct queue_entry *e; - uint32_t request; - - request = REQ_DEL_SYNC; - if (async) - request = REQ_DEL_ASYNC; - - e = make_queue_entry(req, request, key, ksize, NULL, 0); - if (e == NULL) { - rep_send_error(req, ERR_MEM); - return; - } - queue_lock(op_queue); - queue_put(op_queue, e); - queue_unlock(op_queue); - - if (async) { - mini_reply(req, REP_OK); - } else { - /* See comment on parse_set(). */ - queue_signal(op_queue); - } - - return; - } - - return; -} - -static void parse_cas(struct req_info *req, int impact_db) -{ - int rv; - unsigned char *key, *oldval, *newval; - uint32_t ksize, ovsize, nvsize; - const int max = 65536; - - /* Request format: - * 4 ksize - * 4 ovsize - * 4 nvsize - * ksize key - * ovsize oldval - * nvsize newval - */ - ksize = * (uint32_t *) req->payload; - ksize = ntohl(ksize); - ovsize = * ( ((uint32_t *) req->payload) + 1); - ovsize = ntohl(ovsize); - nvsize = * ( ((uint32_t *) req->payload) + 2); - nvsize = ntohl(nvsize); - - /* Sanity check on sizes: - * - ksize, ovsize and nvsize must all be < req->psize - * - ksize, ovsize and nvsize must all be < 2^16 = 64k - * - ksize + ovsize + mvsize < 2^16 = 64k - */ - if ( (req->psize < ksize) || (req->psize < ovsize) || - (req->psize < nvsize) || - (ksize > max) || (ovsize > max) || - (nvsize > max) || - ( (ksize + ovsize + nvsize) > max) ) { - stats.net_broken_req++; - rep_send_error(req, ERR_BROKEN); - return; - } - - key = req->payload + sizeof(uint32_t) * 3; - oldval = key + ksize; - newval = oldval + ovsize; - - rv = cache_cas(cache_table, key, ksize, oldval, ovsize, - newval, nvsize); - if (rv == 0) { - /* If the cache doesn't match, there is no need to bother the - * DB even if we were asked to impact. */ - mini_reply(req, REP_NOMATCH); - return; - } - - if (!impact_db) { - if (rv == -1) { - mini_reply(req, REP_NOTIN); - return; - } else { - mini_reply(req, REP_OK); - return; - } - } else { - /* impact_db = 1 and the key is either not in the cache, or - * cache_cas() was successful. We now need to queue the CAS in - * the database. */ - struct queue_entry *e; - - e = make_queue_long_entry(req, REQ_CAS, key, ksize, - oldval, ovsize, newval, nvsize); - if (e == NULL) { - rep_send_error(req, ERR_MEM); - return; - } - - queue_lock(op_queue); - queue_put(op_queue, e); - queue_unlock(op_queue); - queue_signal(op_queue); - } - return; -} - - diff --git a/nmdb/tipc.c b/nmdb/tipc.c index d6a6fb0..c1b0059 100644 --- a/nmdb/tipc.c +++ b/nmdb/tipc.c @@ -11,24 +11,18 @@ #include "tipc.h" #include "common.h" -#include "queue.h" #include "net-const.h" #include "req.h" +#include "parse.h" -static void parse_msg(struct req_info *req, unsigned char *buf, - size_t bsize); -static void parse_get(struct req_info *req, int impact_db); -static void parse_set(struct req_info *req, int impact_db, int async); -static void parse_del(struct req_info *req, int impact_db, int async); -static void parse_cas(struct req_info *req, int impact_db); - -void tipc_reply_err(struct req_info *req, uint32_t reply); -void tipc_reply_get(struct req_info *req, uint32_t reply, +static void tipc_mini_reply(struct req_info *req, uint32_t reply); +static void tipc_reply_err(struct req_info *req, uint32_t reply); +static void tipc_reply_get(struct req_info *req, uint32_t reply, unsigned char *val, size_t vsize); -void tipc_reply_set(struct req_info *req, uint32_t reply); -void tipc_reply_del(struct req_info *req, uint32_t reply); -void tipc_reply_cas(struct req_info *req, uint32_t reply); +static void tipc_reply_set(struct req_info *req, uint32_t reply); +static void tipc_reply_del(struct req_info *req, uint32_t reply); +static void tipc_reply_cas(struct req_info *req, uint32_t reply); /* @@ -77,7 +71,7 @@ static int rep_send(const struct req_info *req, const unsigned char *buf, /* Send small replies, consisting in only a value. */ -static void mini_reply(struct req_info *req, uint32_t reply) +static void tipc_mini_reply(struct req_info *req, uint32_t reply) { /* We use a mini buffer to speedup the small replies, to avoid the * malloc() overhead. */ @@ -94,99 +88,6 @@ static void mini_reply(struct req_info *req, uint32_t reply) } -/* Create a queue entry structure based on the parameters passed. Memory - * allocated here will be free()'d in queue_entry_free(). It's not the - * cleanest way, but the alternatives are even messier. */ -static struct queue_entry *make_queue_long_entry(struct req_info *req, - uint32_t operation, const unsigned char *key, size_t ksize, - const unsigned char *val, size_t vsize, - const unsigned char *newval, size_t nvsize) -{ - struct queue_entry *e; - unsigned char *kcopy, *vcopy, *nvcopy; - - e = queue_entry_create(); - if (e == NULL) { - return NULL; - } - - kcopy = NULL; - if (key != NULL) { - kcopy = malloc(ksize); - if (kcopy == NULL) { - queue_entry_free(e); - return NULL; - } - memcpy(kcopy, key, ksize); - } - - vcopy = NULL; - if (val != NULL) { - vcopy = malloc(vsize); - if (vcopy == NULL) { - queue_entry_free(e); - if (kcopy != NULL) - free(kcopy); - return NULL; - } - memcpy(vcopy, val, vsize); - } - - nvcopy = NULL; - if (newval != NULL) { - nvcopy = malloc(nvsize); - if (nvcopy == NULL) { - queue_entry_free(e); - if (kcopy != NULL) - free(kcopy); - if (vcopy != NULL) - free(vcopy); - return NULL; - } - memcpy(nvcopy, newval, nvsize); - } - - e->operation = operation; - e->key = kcopy; - e->ksize = ksize; - e->val = vcopy; - e->vsize = vsize; - e->newval = nvcopy; - e->nvsize = nvsize; - - /* Create a copy of req, including clisa */ - e->req = malloc(sizeof(struct req_info)); - if (e->req == NULL) { - queue_entry_free(e); - return NULL; - } - memcpy(e->req, req, sizeof(struct req_info)); - - e->req->clisa = malloc(sizeof(struct sockaddr_tipc)); - if (e->req->clisa == NULL) { - queue_entry_free(e); - return NULL; - } - memcpy(e->req->clisa, req->clisa, sizeof(struct sockaddr_tipc)); - - /* clear out unused fields */ - e->req->payload = NULL; - e->req->psize = 0; - - return e; -} - -/* Like make_queue_long_entry() but with few parameters because most actions - * do not need newval. */ -static struct queue_entry *make_queue_entry(struct req_info *req, - uint32_t operation, const unsigned char *key, size_t ksize, - const unsigned char *val, size_t vsize) -{ - return make_queue_long_entry(req, operation, key, ksize, val, vsize, - NULL, 0); -} - - /* The tipc_reply_* functions are used by the db code to send the network * replies. */ @@ -200,7 +101,7 @@ void tipc_reply_get(struct req_info *req, uint32_t reply, { if (val == NULL) { /* miss */ - mini_reply(req, reply); + tipc_mini_reply(req, reply); } else { unsigned char *buf; size_t bsize; @@ -234,18 +135,18 @@ void tipc_reply_get(struct req_info *req, uint32_t reply, void tipc_reply_set(struct req_info *req, uint32_t reply) { - mini_reply(req, reply); + tipc_mini_reply(req, reply); } void tipc_reply_del(struct req_info *req, uint32_t reply) { - mini_reply(req, reply); + tipc_mini_reply(req, reply); } void tipc_reply_cas(struct req_info *req, uint32_t reply) { - mini_reply(req, reply); + tipc_mini_reply(req, reply); } @@ -325,6 +226,7 @@ void tipc_recv(int fd, short event, void *arg) req.type = REQTYPE_TIPC; req.clisa = (struct sockaddr *) &clisa; req.clilen = clilen; + req.mini_reply = tipc_mini_reply; req.reply_err = tipc_reply_err; req.reply_get = tipc_reply_get; req.reply_set = tipc_reply_set; @@ -332,331 +234,10 @@ void tipc_recv(int fd, short event, void *arg) req.reply_cas = tipc_reply_cas; /* parse the message */ - parse_msg(&req, buf, rv); + parse_message(&req, buf, rv); exit: return; } -/* Main message parsing and dissecting */ -static void parse_msg(struct req_info *req, unsigned char *buf, size_t bsize) -{ - uint32_t hdr, id, cmd; - uint8_t ver; - size_t psize; - unsigned char *payload; - - hdr = * (uint32_t *) buf; - hdr = htonl(hdr); - - /* FIXME: little endian-only */ - ver = (hdr & 0xF0000000) >> 28; - id = hdr & 0x0FFFFFFF; - req->id = id; - - cmd = ntohl(* ((uint32_t *) buf + 1)); - - if (ver != PROTO_VER) { - stats.net_version_mismatch++; - rep_send_error(req, ERR_VER); - return; - } - - /* We define payload as the stuff after buf. But be careful because - * there might be none (if bsize == 1). Doing the pointer arithmetic - * isn't problematic, but accessing the payload should be done only if - * we know we have enough data. */ - payload = buf + 8; - psize = bsize - 8; - - /* Store the id encoded in network byte order, so that we don't have - * to calculate it at send time. */ - req->id = htonl(id); - req->cmd = cmd; - req->payload = payload; - req->psize = psize; - - if (cmd == REQ_CACHE_GET) - parse_get(req, 0); - else if (cmd == REQ_CACHE_SET) - parse_set(req, 0, 0); - else if (cmd == REQ_CACHE_DEL) - parse_del(req, 0, 0); - else if (cmd == REQ_GET) - parse_get(req, 1); - else if (cmd == REQ_SET_SYNC) - parse_set(req, 1, 0); - else if (cmd == REQ_DEL_SYNC) - parse_del(req, 1, 0); - else if (cmd == REQ_SET_ASYNC) - parse_set(req, 1, 1); - else if (cmd == REQ_DEL_ASYNC) - parse_del(req, 1, 1); - else if (cmd == REQ_CACHE_CAS) - parse_cas(req, 0); - else if (cmd == REQ_CAS) - parse_cas(req, 1); - else { - stats.net_unk_req++; - rep_send_error(req, ERR_UNKREQ); - return; - } - - return; -} - - -static void parse_get(struct req_info *req, int impact_db) -{ - int hit; - unsigned char *key; - uint32_t ksize; - unsigned char *val = NULL; - size_t vsize = 0; - - if (settings.passive) - return; - - ksize = * (uint32_t *) req->payload; - ksize = ntohl(ksize); - if (req->psize < ksize) { - stats.net_broken_req++; - rep_send_error(req, ERR_BROKEN); - return; - } - - key = req->payload + sizeof(uint32_t); - - hit = cache_get(cache_table, key, ksize, &val, &vsize); - - if (!hit && !impact_db) { - mini_reply(req, REP_CACHE_MISS); - return; - } else if (!hit && impact_db) { - struct queue_entry *e; - e = make_queue_entry(req, REQ_GET, key, ksize, NULL, 0); - if (e == NULL) { - rep_send_error(req, ERR_MEM); - return; - } - queue_lock(op_queue); - queue_put(op_queue, e); - queue_unlock(op_queue); - queue_signal(op_queue); - return; - } else { - tipc_reply_get(req, REP_CACHE_HIT, val, vsize); - return; - } -} - - -static void parse_set(struct req_info *req, int impact_db, int async) -{ - int rv; - unsigned char *key, *val; - uint32_t ksize, vsize; - const int max = 65536; - - /* Request format: - * 4 ksize - * 4 vsize - * ksize key - * vsize val - */ - ksize = * (uint32_t *) req->payload; - ksize = ntohl(ksize); - vsize = * ( ((uint32_t *) req->payload) + 1), - vsize = ntohl(vsize); - - /* Sanity check on sizes: - * - ksize and vsize must both be < req->psize - * - ksize and vsize must both be < 2^16 = 64k - * - ksize + vsize < 2^16 = 64k - */ - if ( (req->psize < ksize) || (req->psize < vsize) || - (ksize > max) || (vsize > max) || - ( (ksize + vsize) > max) ) { - stats.net_broken_req++; - rep_send_error(req, ERR_BROKEN); - return; - } - - key = req->payload + sizeof(uint32_t) * 2; - val = key + ksize; - - rv = cache_set(cache_table, key, ksize, val, vsize); - if (!rv) { - rep_send_error(req, ERR_MEM); - return; - } - - if (impact_db) { - struct queue_entry *e; - uint32_t request; - - request = REQ_SET_SYNC; - if (async) - request = REQ_SET_ASYNC; - - e = make_queue_entry(req, request, key, ksize, val, vsize); - if (e == NULL) { - rep_send_error(req, ERR_MEM); - return; - } - queue_lock(op_queue); - queue_put(op_queue, e); - queue_unlock(op_queue); - - if (async) { - mini_reply(req, REP_OK); - } else { - /* Signal the DB thread it has work only if it's a - * synchronous operation, asynchronous don't mind - * waiting. It does have a measurable impact on - * performance (2083847usec vs 2804973usec for sets on - * "test2d 100000 10 10"). */ - queue_signal(op_queue); - } - return; - } else { - mini_reply(req, REP_OK); - } - - return; -} - - -static void parse_del(struct req_info *req, int impact_db, int async) -{ - int hit; - unsigned char *key; - uint32_t ksize; - - ksize = * (uint32_t *) req->payload; - ksize = ntohl(ksize); - if (req->psize < ksize) { - stats.net_broken_req++; - rep_send_error(req, ERR_BROKEN); - return; - } - - key = req->payload + sizeof(uint32_t); - - hit = cache_del(cache_table, key, ksize); - - if (!impact_db && hit) { - mini_reply(req, REP_OK); - } else if (!impact_db && !hit) { - mini_reply(req, REP_NOTIN); - } else if (impact_db) { - struct queue_entry *e; - uint32_t request; - - request = REQ_DEL_SYNC; - if (async) - request = REQ_DEL_ASYNC; - - e = make_queue_entry(req, request, key, ksize, NULL, 0); - if (e == NULL) { - rep_send_error(req, ERR_MEM); - return; - } - queue_lock(op_queue); - queue_put(op_queue, e); - queue_unlock(op_queue); - - if (async) { - mini_reply(req, REP_OK); - } else { - /* See comment on parse_set(). */ - queue_signal(op_queue); - } - - return; - } - - return; -} - -static void parse_cas(struct req_info *req, int impact_db) -{ - int rv; - unsigned char *key, *oldval, *newval; - uint32_t ksize, ovsize, nvsize; - const int max = 65536; - - /* Request format: - * 4 ksize - * 4 ovsize - * 4 nvsize - * ksize key - * ovsize oldval - * nvsize newval - */ - ksize = * (uint32_t *) req->payload; - ksize = ntohl(ksize); - ovsize = * ( ((uint32_t *) req->payload) + 1); - ovsize = ntohl(ovsize); - nvsize = * ( ((uint32_t *) req->payload) + 2); - nvsize = ntohl(nvsize); - - /* Sanity check on sizes: - * - ksize, ovsize and nvsize must all be < req->psize - * - ksize, ovsize and nvsize must all be < 2^16 = 64k - * - ksize + ovsize + mvsize < 2^16 = 64k - */ - if ( (req->psize < ksize) || (req->psize < ovsize) || - (req->psize < nvsize) || - (ksize > max) || (ovsize > max) || - (nvsize > max) || - ( (ksize + ovsize + nvsize) > max) ) { - stats.net_broken_req++; - rep_send_error(req, ERR_BROKEN); - return; - } - - key = req->payload + sizeof(uint32_t) * 3; - oldval = key + ksize; - newval = oldval + ovsize; - - rv = cache_cas(cache_table, key, ksize, oldval, ovsize, - newval, nvsize); - if (rv == 0) { - /* If the cache doesn't match, there is no need to bother the - * DB even if we were asked to impact. */ - mini_reply(req, REP_NOMATCH); - return; - } - - if (!impact_db) { - if (rv == -1) { - mini_reply(req, REP_NOTIN); - return; - } else { - mini_reply(req, REP_OK); - return; - } - } else { - /* impact_db = 1 and the key is either not in the cache, or - * cache_cas() was successful. We now need to queue the CAS in - * the database. */ - struct queue_entry *e; - - e = make_queue_long_entry(req, REQ_CAS, key, ksize, - oldval, ovsize, newval, nvsize); - if (e == NULL) { - rep_send_error(req, ERR_MEM); - return; - } - - queue_lock(op_queue); - queue_put(op_queue, e); - queue_unlock(op_queue); - queue_signal(op_queue); - } - return; -} - -