author | Alberto Bertogli
<albertito@gmail.com> 2007-06-01 06:14:51 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2007-06-01 06:14:51 UTC |
parent | b2c19e00dcbbfa9ce20fbd1f9a0e84826d0fb826 |
nmdb/parse.c | +439 | -0 |
nmdb/parse.h | +11 | -0 |
diff --git a/nmdb/parse.c b/nmdb/parse.c new file mode 100644 index 0000000..1acd07f --- /dev/null +++ b/nmdb/parse.c @@ -0,0 +1,439 @@ + +#include <stdlib.h> /* malloc() */ +#include <stdint.h> /* uint32_t and friends */ +#include <string.h> /* memcpy() */ +#include <arpa/inet.h> /* htonl() and friends */ + + +#include "parse.h" +#include "req.h" +#include "queue.h" +#include "net-const.h" +#include "common.h" + + +static void parse_get(struct req_info *req, int impact_db); +static void parse_set(struct req_info *req, int impact_db, int async); +static void parse_del(struct req_info *req, int impact_db, int async); +static void parse_cas(struct req_info *req, int impact_db); + + +/* Create a queue entry structure based on the parameters passed. Memory + * allocated here will be free()'d in queue_entry_free(). It's not the + * cleanest way, but the alternatives are even messier. */ +static struct queue_entry *make_queue_long_entry(struct req_info *req, + uint32_t operation, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize, + const unsigned char *newval, size_t nvsize) +{ + struct queue_entry *e; + unsigned char *kcopy, *vcopy, *nvcopy; + + e = queue_entry_create(); + if (e == NULL) { + return NULL; + } + + kcopy = NULL; + if (key != NULL) { + kcopy = malloc(ksize); + if (kcopy == NULL) { + queue_entry_free(e); + return NULL; + } + memcpy(kcopy, key, ksize); + } + + vcopy = NULL; + if (val != NULL) { + vcopy = malloc(vsize); + if (vcopy == NULL) { + queue_entry_free(e); + if (kcopy != NULL) + free(kcopy); + return NULL; + } + memcpy(vcopy, val, vsize); + } + + nvcopy = NULL; + if (newval != NULL) { + nvcopy = malloc(nvsize); + if (nvcopy == NULL) { + queue_entry_free(e); + if (kcopy != NULL) + free(kcopy); + if (vcopy != NULL) + free(vcopy); + return NULL; + } + memcpy(nvcopy, newval, nvsize); + } + + e->operation = operation; + e->key = kcopy; + e->ksize = ksize; + e->val = vcopy; + e->vsize = vsize; + e->newval = nvcopy; + e->nvsize = nvsize; + + /* Create a copy of req, including clisa */ + e->req = malloc(sizeof(struct req_info)); + if (e->req == NULL) { + queue_entry_free(e); + return NULL; + } + memcpy(e->req, req, sizeof(struct req_info)); + + e->req->clisa = malloc(req->clilen); + if (e->req->clisa == NULL) { + queue_entry_free(e); + return NULL; + } + memcpy(e->req->clisa, req->clisa, req->clilen); + + /* clear out unused fields */ + e->req->payload = NULL; + e->req->psize = 0; + + return e; +} + +/* Like make_queue_long_entry() but with few parameters because most actions + * do not need newval. */ +static struct queue_entry *make_queue_entry(struct req_info *req, + uint32_t operation, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize) +{ + return make_queue_long_entry(req, operation, key, ksize, val, vsize, + NULL, 0); +} + + +/* Parse an incoming message. Note that the protocol might have sent this + * directly over the network (ie. TIPC) or might have wrapped it around (ie. + * TCP). Here we only deal with the clean, stripped, non protocol-specific + * message. */ +int parse_message(struct req_info *req, + const unsigned char *buf, size_t len) +{ + uint32_t hdr, ver, id, cmd; + const unsigned char *payload; + size_t psize; + + /* The header is: + * 4 bytes Version + ID + * 4 bytes Command + * Variable Payload + */ + + hdr = * ((uint32_t *) buf); + hdr = htonl(hdr); + + /* FIXME: little endian-only */ + ver = (hdr & 0xF0000000) >> 28; + id = hdr & 0x0FFFFFFF; + req->id = id; + + cmd = ntohl(* ((uint32_t *) buf + 1)); + + if (ver != PROTO_VER) { + stats.net_version_mismatch++; + req->reply_err(req, ERR_VER); + return 0; + } + + /* We define payload as the stuff after buf. But be careful because + * there might be none (if len == 1). Doing the pointer arithmetic + * isn't problematic, but accessing the payload should be done only if + * we know we have enough data. */ + payload = buf + 8; + psize = len - 8; + + /* Store the id encoded in network byte order, so that we don't have + * to calculate it at send time. */ + req->id = htonl(id); + req->cmd = cmd; + req->payload = payload; + req->psize = psize; + + if (cmd == REQ_CACHE_GET) + parse_get(req, 0); + else if (cmd == REQ_CACHE_SET) + parse_set(req, 0, 0); + else if (cmd == REQ_CACHE_DEL) + parse_del(req, 0, 0); + else if (cmd == REQ_GET) + parse_get(req, 1); + else if (cmd == REQ_SET_SYNC) + parse_set(req, 1, 0); + else if (cmd == REQ_DEL_SYNC) + parse_del(req, 1, 0); + else if (cmd == REQ_SET_ASYNC) + parse_set(req, 1, 1); + else if (cmd == REQ_DEL_ASYNC) + parse_del(req, 1, 1); + else if (cmd == REQ_CACHE_CAS) + parse_cas(req, 0); + else if (cmd == REQ_CAS) + parse_cas(req, 1); + else { + stats.net_unk_req++; + req->reply_err(req, ERR_UNKREQ); + } + + return 1; +} + + +static void parse_get(struct req_info *req, int impact_db) +{ + int hit; + const unsigned char *key; + uint32_t ksize; + unsigned char *val = NULL; + size_t vsize = 0; + + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + if (req->psize < ksize) { + stats.net_broken_req++; + req->reply_err(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t); + + hit = cache_get(cache_table, key, ksize, &val, &vsize); + + if (!hit && !impact_db) { + req->mini_reply(req, REP_CACHE_MISS); + return; + } else if (!hit && impact_db) { + struct queue_entry *e; + e = make_queue_entry(req, REQ_GET, key, ksize, NULL, 0); + if (e == NULL) { + req->reply_err(req, ERR_MEM); + return; + } + queue_lock(op_queue); + queue_put(op_queue, e); + queue_unlock(op_queue); + queue_signal(op_queue); + return; + } else { + req->reply_get(req, REP_CACHE_HIT, val, vsize); + return; + } +} + + +static void parse_set(struct req_info *req, int impact_db, int async) +{ + int rv; + const unsigned char *key, *val; + uint32_t ksize, vsize; + const int max = 65536; + + /* Request format: + * 4 ksize + * 4 vsize + * ksize key + * vsize val + */ + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + vsize = * ( ((uint32_t *) req->payload) + 1), + vsize = ntohl(vsize); + + /* Sanity check on sizes: + * - ksize and vsize must both be < req->psize + * - ksize and vsize must both be < 2^16 = 64k + * - ksize + vsize < 2^16 = 64k + */ + if ( (req->psize < ksize) || (req->psize < vsize) || + (ksize > max) || (vsize > max) || + ( (ksize + vsize) > max) ) { + stats.net_broken_req++; + req->reply_err(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t) * 2; + val = key + ksize; + + rv = cache_set(cache_table, key, ksize, val, vsize); + if (!rv) { + req->reply_err(req, ERR_MEM); + return; + } + + if (impact_db) { + struct queue_entry *e; + uint32_t request; + + request = REQ_SET_SYNC; + if (async) + request = REQ_SET_ASYNC; + + e = make_queue_entry(req, request, key, ksize, val, vsize); + if (e == NULL) { + req->reply_err(req, ERR_MEM); + return; + } + queue_lock(op_queue); + queue_put(op_queue, e); + queue_unlock(op_queue); + + if (async) { + req->mini_reply(req, REP_OK); + } else { + /* Signal the DB thread it has work only if it's a + * synchronous operation, asynchronous don't mind + * waiting. It does have a measurable impact on + * performance (2083847usec vs 2804973usec for sets on + * "test2d 100000 10 10"). */ + queue_signal(op_queue); + } + return; + } else { + req->mini_reply(req, REP_OK); + } + + return; +} + + +static void parse_del(struct req_info *req, int impact_db, int async) +{ + int hit; + const unsigned char *key; + uint32_t ksize; + + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + if (req->psize < ksize) { + stats.net_broken_req++; + req->reply_err(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t); + + hit = cache_del(cache_table, key, ksize); + + if (!impact_db && hit) { + req->mini_reply(req, REP_OK); + } else if (!impact_db && !hit) { + req->mini_reply(req, REP_NOTIN); + } else if (impact_db) { + struct queue_entry *e; + uint32_t request; + + request = REQ_DEL_SYNC; + if (async) + request = REQ_DEL_ASYNC; + + e = make_queue_entry(req, request, key, ksize, NULL, 0); + if (e == NULL) { + req->reply_err(req, ERR_MEM); + return; + } + queue_lock(op_queue); + queue_put(op_queue, e); + queue_unlock(op_queue); + + if (async) { + req->mini_reply(req, REP_OK); + } else { + /* See comment on parse_set(). */ + queue_signal(op_queue); + } + + return; + } + + return; +} + +static void parse_cas(struct req_info *req, int impact_db) +{ + int rv; + const unsigned char *key, *oldval, *newval; + uint32_t ksize, ovsize, nvsize; + const int max = 65536; + + /* Request format: + * 4 ksize + * 4 ovsize + * 4 nvsize + * ksize key + * ovsize oldval + * nvsize newval + */ + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + ovsize = * ( ((uint32_t *) req->payload) + 1); + ovsize = ntohl(ovsize); + nvsize = * ( ((uint32_t *) req->payload) + 2); + nvsize = ntohl(nvsize); + + /* Sanity check on sizes: + * - ksize, ovsize and nvsize must all be < req->psize + * - ksize, ovsize and nvsize must all be < 2^16 = 64k + * - ksize + ovsize + mvsize < 2^16 = 64k + */ + if ( (req->psize < ksize) || (req->psize < ovsize) || + (req->psize < nvsize) || + (ksize > max) || (ovsize > max) || + (nvsize > max) || + ( (ksize + ovsize + nvsize) > max) ) { + stats.net_broken_req++; + req->reply_err(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t) * 3; + oldval = key + ksize; + newval = oldval + ovsize; + + rv = cache_cas(cache_table, key, ksize, oldval, ovsize, + newval, nvsize); + if (rv == 0) { + /* If the cache doesn't match, there is no need to bother the + * DB even if we were asked to impact. */ + req->mini_reply(req, REP_NOMATCH); + return; + } + + if (!impact_db) { + if (rv == -1) { + req->mini_reply(req, REP_NOTIN); + return; + } else { + req->mini_reply(req, REP_OK); + return; + } + } else { + /* impact_db = 1 and the key is either not in the cache, or + * cache_cas() was successful. We now need to queue the CAS in + * the database. */ + struct queue_entry *e; + + e = make_queue_long_entry(req, REQ_CAS, key, ksize, + oldval, ovsize, newval, nvsize); + if (e == NULL) { + req->reply_err(req, ERR_MEM); + return; + } + + queue_lock(op_queue); + queue_put(op_queue, e); + queue_unlock(op_queue); + queue_signal(op_queue); + } + return; +} + + diff --git a/nmdb/parse.h b/nmdb/parse.h new file mode 100644 index 0000000..b27cf40 --- /dev/null +++ b/nmdb/parse.h @@ -0,0 +1,11 @@ + +#ifndef _PARSE_H +#define _PARSE_H + +#include "req.h" + +int parse_message(struct req_info *req, + const unsigned char *buf, size_t len); + +#endif +