author | Alberto Bertogli
<albertito@gmail.com> 2008-06-04 05:15:24 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2008-06-04 18:17:56 UTC |
parent | ea5ce9834b1a3f5cd9607a7567d83188682fb8ac |
doc/design.rst | +15 | -32 |
doc/network.rst | +44 | -28 |
libnmdb/libnmdb.c | +39 | -58 |
libnmdb/net-const.h | +10 | -13 |
libnmdb/nmdb.skel.h | +8 | -0 |
nmdb/dbloop.c | +8 | -8 |
nmdb/net-const.h | +10 | -13 |
nmdb/parse.c | +74 | -80 |
nmdb/req.h | +1 | -0 |
diff --git a/doc/design.rst b/doc/design.rst index 04ba8d1..751d76f 100644 --- a/doc/design.rst +++ b/doc/design.rst @@ -41,6 +41,12 @@ get *key* Retrieves the value for the given key. If the key is in the cache, it returns immediately. If not, it performs a query in the database. +set *key* *value* + Stores the *(key, value)* pair in the database. + +del *key* + Removes the key and its associated value from the database. + cas *key* *oldvalue* *newvalue* Do a compare-and-swap, using *oldvalue* to compare with the value stored in the database, and replacing it with *newvalue* if they match. @@ -48,42 +54,19 @@ cas *key* *oldvalue* *newvalue* incr *key* *increment* Increments the value associated to the given key by the given increment. -set_async *key* *value* - Stores the *(key, value)* pair in the database. It does the set in the cache, - queues the operation for the database, and returns. - -del_async *key* - Removes the key and it's associated value from the database. It does the del - in the cache, queues the operation for the database, and returns. - -set_sync *key* *value* - Like *set*, but return only after the database has completed the operation. - -del_sync *key* - Like *del*, but return only after the database has completed the operation. - -cache_get *key* - Like *get*, but only affects the cache and not the database. If the key is - not in the cache, returns a special value indicating "miss". - -cache_set *key* *value* - Like *set*, but only affects the cache and not the database. - -cache_del *key* - Like *del*, but only affects the cache and not the database. - -cache_cas *key* *oldvalue* *newvalue* - Like *cas*, but only affects the cache and not the database. - -cache_incr *key* *increment* - Like *incr*, but only affects the cache and not the database. +Request can have flags that affect their behaviour. The *cache-only* flag +makes the operation affect only the cache but not the database; and the +*sync* flag makes the server wait until the request has been performed to +reply, instead of replying as soon as the request is queued for processing. +Not all the flags make sense for all the operations, consult the library +documentation for details. As you can see, it's possible to operate exclusively with the cache, ignoring the database completely. This is very similar to what memcached_ does. Note that the downside is that it's possible to mess with the cache, and leave it -out of sync with the database. You can only do this if you mix *cache_set* -with *set* or *set_sync*, which is hard to miss, so it's unlikely you will do -this. +out of sync with the database. You can only do this if you mix a *cache-only +set* with a normal *set*, which is hard to miss, so it's unlikely you will do +this by mistake. The server assumes you have a brain, and that you will use it. diff --git a/doc/network.rst b/doc/network.rst index 132435f..0c5030e 100644 --- a/doc/network.rst +++ b/doc/network.rst @@ -21,23 +21,32 @@ Requests All requests begin with a common header, and then have a request-specific payload. They look like this:: - +-----+------------+------------------+--- - - - ---+ - | Ver | Request ID | Request code | Payload | - +-----+------------+------------------+--- - - - ---+ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Version | Request ID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Request code | Flags | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + : Payload : + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + Where the fields are: -Ver - Version of the protocol. 4 bits. Must be 1. -Request ID +Ver (4 bits) + Version of the protocol. Must be 1. +Request ID (28 bits) A number identifying the request. A request is uniquely represented by the *(ID, sender)* pair, where *sender* is the sender host information. IDs can - be reused once a matching response has arrived to the sender. 28 bits. -Request code + be reused once a matching response has arrived to the sender. +Request code (16 bits) The code of the operation to be performed. They're defined in the server - source code. 32 bits. -Payload - The payload is specific to the request code. Some requests can carry no + source code. +Flags (16 bits) + Flags that affect the request code. +Payload (variable, can be absent) + The payload is specific to the request code. Some requests carry no associated payload. @@ -51,38 +60,45 @@ completeness. ============== ====== Name Code ============== ====== -REQ_CACHE_GET 0x101 -REQ_CACHE_SET 0x102 -REQ_CACHE_DEL 0x103 -REQ_GET 0x104 -REQ_SET_SYNC 0x105 -REQ_DEL_SYNC 0x106 -REQ_SET_ASYNC 0x107 -REQ_DEL_ASYNC 0x108 -REQ_CACHE_CAS 0x109 -REQ_CAS 0x110 -REQ_CACHE_INCR 0x111 -REQ_INCR 0x112 +REQ_GET 0x101 +REQ_SET 0x102 +REQ_DEL 0x103 +REQ_CAS 0x104 +REQ_INCR 0x105 ============== ====== +Flags +----- + +Note that not all requests accept all the flags. Flags that are not relevant +for a given request will be ignored. + +================= ====== ============================================= + Name Code Relevant to +================= ====== ============================================= +FLAGS_CACHE_ONLY 1 REQ_GET, REQ_SET, REQ_DEL, REQ_CAS, REQ_INCR +FLAGS_SYNC 2 REQ_SET, REQ_DEL +================= ====== ============================================= + + Request payload formats ----------------------- -REQ_GET and REQ_CACHE_GET +REQ_GET These requests have the same payload format, and only differ on the code. First the key size (32 bits), and then the key. -REQ_SET_* and REQ_CACHE_SET +REQ_SET Like the previous requests, they share the payload format. First the key size (32 bits), then the value size (32 bits), then the key, and then the value. -REQ_DEL_* and REQ_CACHE_DEL +REQ_DEL You guessed it, they share the payload format too: first the key size (32 bits), and then the key. -REQ_CAS and REQ_CACHE_CAS +REQ_CAS First the key size, then the old value size, then the new value size, and then the key, the old value and the new value. -REQ_INCR and REQ_CACHE_INCR +REQ_INCR First the key size (32 bits), then the key, and then the increment as a signed network byte order 64 bit integer. diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c index e2ffc59..b6aa331 100644 --- a/libnmdb/libnmdb.c +++ b/libnmdb/libnmdb.c @@ -244,7 +244,8 @@ static struct nmdb_srv *select_srv(nmdb_t *db, /* Creates a new buffer for packets */ static unsigned char *new_packet(struct nmdb_srv *srv, unsigned int request, - size_t *bufsize, size_t *payload_offset, ssize_t payload_size) + unsigned short flags, size_t *bufsize, size_t *payload_offset, + ssize_t payload_size) { unsigned char *buf, *p; unsigned int moff = srv_get_msg_offset(srv); @@ -264,7 +265,8 @@ static unsigned char *new_packet(struct nmdb_srv *srv, unsigned int request, p = buf + moff; * (uint32_t *) p = htonl( (PROTO_VER << 28) | ID_CODE ); - * ((uint32_t *) p + 1) = htonl(request); + * ((uint16_t *) p + 2) = htons(request); + * ((uint16_t *) p + 3) = htons(flags); if (payload_offset != NULL) *payload_offset = moff + 8; @@ -311,22 +313,18 @@ static size_t append_3v(unsigned char *buf, static ssize_t do_get(nmdb_t *db, const unsigned char *key, size_t ksize, - unsigned char *val, size_t vsize, int impact_db) + unsigned char *val, size_t vsize, unsigned short flags) { ssize_t rv, t; unsigned char *buf, *p; size_t bufsize, reqsize, payload_offset, psize = 0; - uint32_t request, reply; + uint32_t reply; struct nmdb_srv *srv; - if (impact_db) { - request = REQ_GET; - } else { - request = REQ_CACHE_GET; - } + flags = flags & NMDB_CACHE_ONLY; srv = select_srv(db, key, ksize); - buf = new_packet(srv, request, &bufsize, &payload_offset, -1); + buf = new_packet(srv, REQ_GET, flags, &bufsize, &payload_offset, -1); if (buf == NULL) return -1; reqsize = payload_offset; @@ -371,39 +369,32 @@ exit: ssize_t nmdb_get(nmdb_t *db, const unsigned char *key, size_t ksize, unsigned char *val, size_t vsize) { - return do_get(db, key, ksize, val, vsize, 1); + return do_get(db, key, ksize, val, vsize, 0); } ssize_t nmdb_cache_get(nmdb_t *db, const unsigned char *key, size_t ksize, unsigned char *val, size_t vsize) { - return do_get(db, key, ksize, val, vsize, 0); + return do_get(db, key, ksize, val, vsize, NMDB_CACHE_ONLY); } static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *val, size_t vsize, - int impact_db, int async) + unsigned short flags) { ssize_t rv, t; unsigned char *buf; size_t bufsize, payload_offset, reqsize; - uint32_t request, reply; + uint32_t reply; struct nmdb_srv *srv; - if (impact_db) { - if (async) - request = REQ_SET_ASYNC; - else - request = REQ_SET_SYNC; - } else { - request = REQ_CACHE_SET; - } + flags = flags & (NMDB_CACHE_ONLY | NMDB_SYNC); srv = select_srv(db, key, ksize); - buf = new_packet(srv, request, &bufsize, &payload_offset, + buf = new_packet(srv, REQ_SET, flags, &bufsize, &payload_offset, 4 * 2 + ksize + vsize); if (buf == NULL) return -1; @@ -435,44 +426,38 @@ exit: int nmdb_set(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *val, size_t vsize) { - return do_set(db, key, ksize, val, vsize, 1, 1); + return do_set(db, key, ksize, val, vsize, 0); } int nmdb_set_sync(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *val, size_t vsize) { - return do_set(db, key, ksize, val, vsize, 1, 0); + return do_set(db, key, ksize, val, vsize, NMDB_SYNC); } int nmdb_cache_set(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *val, size_t vsize) { - return do_set(db, key, ksize, val, vsize, 0, 0); + return do_set(db, key, ksize, val, vsize, NMDB_CACHE_ONLY); } static int do_del(nmdb_t *db, const unsigned char *key, size_t ksize, - int impact_db, int async) + unsigned short flags) { ssize_t rv, t; unsigned char *buf; size_t bufsize, payload_offset, reqsize; - uint32_t request, reply; + uint32_t reply; struct nmdb_srv *srv; - if (impact_db) { - if (async) - request = REQ_DEL_ASYNC; - else - request = REQ_DEL_SYNC; - } else { - request = REQ_CACHE_DEL; - } + flags = flags & (NMDB_CACHE_ONLY | NMDB_SYNC); srv = select_srv(db, key, ksize); - buf = new_packet(srv, request, &bufsize, &payload_offset, 4 + ksize); + buf = new_packet(srv, REQ_DEL, flags, &bufsize, &payload_offset, + 4 + ksize); if (buf == NULL) return -1; reqsize = payload_offset; @@ -505,38 +490,36 @@ exit: int nmdb_del(nmdb_t *db, const unsigned char *key, size_t ksize) { - return do_del(db, key, ksize, 1, 1); + return do_del(db, key, ksize, 0); } int nmdb_del_sync(nmdb_t *db, const unsigned char *key, size_t ksize) { - return do_del(db, key, ksize, 1, 0); + return do_del(db, key, ksize, NMDB_SYNC); } int nmdb_cache_del(nmdb_t *db, const unsigned char *key, size_t ksize) { - return do_del(db, key, ksize, 0, 0); + return do_del(db, key, ksize, NMDB_CACHE_ONLY); } static int do_cas(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *oldval, size_t ovsize, const unsigned char *newval, size_t nvsize, - int impact_db) + unsigned short flags) { ssize_t rv, t; unsigned char *buf; size_t bufsize, payload_offset, reqsize; - uint32_t request, reply; + uint32_t reply; struct nmdb_srv *srv; - request = REQ_CACHE_CAS; - if (impact_db) - request = REQ_CAS; + flags = flags & NMDB_CACHE_ONLY; srv = select_srv(db, key, ksize); - buf = new_packet(srv, request, &bufsize, &payload_offset, + buf = new_packet(srv, REQ_CAS, flags, &bufsize, &payload_offset, 4 * 3 + ksize + ovsize + nvsize); if (buf == NULL) return -1; @@ -576,14 +559,15 @@ int nmdb_cas(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *oldval, size_t ovsize, const unsigned char *newval, size_t nvsize) { - return do_cas(db, key, ksize, oldval, ovsize, newval, nvsize, 1); + return do_cas(db, key, ksize, oldval, ovsize, newval, nvsize, 0); } int nmdb_cache_cas(nmdb_t *db, const unsigned char *key, size_t ksize, const unsigned char *oldval, size_t ovsize, const unsigned char *newval, size_t nvsize) { - return do_cas(db, key, ksize, oldval, ovsize, newval, nvsize, 0); + return do_cas(db, key, ksize, oldval, ovsize, newval, nvsize, + NMDB_CACHE_ONLY); } @@ -616,24 +600,21 @@ static uint64_t htonll(uint64_t x) static int do_incr(nmdb_t *db, const unsigned char *key, size_t ksize, - int64_t increment, int impact_db) + int64_t increment, unsigned short flags) { ssize_t rv, t; unsigned char *buf; size_t bufsize, payload_offset, reqsize; - uint32_t request, reply; + uint32_t reply; struct nmdb_srv *srv; - if (impact_db) - request = REQ_INCR; - else - request = REQ_CACHE_INCR; + flags = flags & NMDB_CACHE_ONLY; srv = select_srv(db, key, ksize); increment = htonll(increment); - buf = new_packet(srv, request, &bufsize, &payload_offset, + buf = new_packet(srv, REQ_INCR, flags, &bufsize, &payload_offset, 4 + ksize + sizeof(int64_t)); if (buf == NULL) return -1; @@ -674,13 +655,13 @@ exit: int nmdb_incr(nmdb_t *db, const unsigned char *key, size_t ksize, int64_t increment) { - return do_incr(db, key, ksize, increment, 1); + return do_incr(db, key, ksize, increment, 0); } int nmdb_cache_incr(nmdb_t *db, const unsigned char *key, size_t ksize, int64_t increment) { - return do_incr(db, key, ksize, increment, 0); + return do_incr(db, key, ksize, increment, NMDB_CACHE_ONLY); } @@ -706,7 +687,7 @@ int nmdb_stats(nmdb_t *db, unsigned char *buf, size_t bsize, for (i = 0; i < db->nservers; i++) { srv = db->servers + i; - request = new_packet(srv, REQ_STATS, &reqsize, NULL, 0); + request = new_packet(srv, REQ_STATS, 0, &reqsize, NULL, 0); t = srv_send(srv, request, reqsize); free(request); diff --git a/libnmdb/net-const.h b/libnmdb/net-const.h index 4d8ecd9..290ab90 100644 --- a/libnmdb/net-const.h +++ b/libnmdb/net-const.h @@ -27,19 +27,16 @@ #define PROTO_VER 1 /* Network requests */ -#define REQ_CACHE_GET 0x101 -#define REQ_CACHE_SET 0x102 -#define REQ_CACHE_DEL 0x103 -#define REQ_GET 0x104 -#define REQ_SET_SYNC 0x105 -#define REQ_DEL_SYNC 0x106 -#define REQ_SET_ASYNC 0x107 -#define REQ_DEL_ASYNC 0x108 -#define REQ_CACHE_CAS 0x109 -#define REQ_CAS 0x110 -#define REQ_CACHE_INCR 0x111 -#define REQ_INCR 0x112 -#define REQ_STATS 0x113 +#define REQ_GET 0x101 +#define REQ_SET 0x102 +#define REQ_DEL 0x103 +#define REQ_CAS 0x104 +#define REQ_INCR 0x105 +#define REQ_STATS 0x106 + +/* Possible request flags (which can be applied to the documented requests) */ +#define FLAGS_CACHE_ONLY 1 /* get, set, del, cas, incr */ +#define FLAGS_SYNC 2 /* set, del */ /* Network replies (different namespace from requests) */ #define REP_ERR 0x800 diff --git a/libnmdb/nmdb.skel.h b/libnmdb/nmdb.skel.h index 9756370..1c21a14 100644 --- a/libnmdb/nmdb.skel.h +++ b/libnmdb/nmdb.skel.h @@ -51,6 +51,14 @@ typedef struct nmdb_t { struct nmdb_srv *servers; } nmdb_t; + +/* Possible flags, notice it may make no sense to mix them, consult the + * documentation before doing weird things. Values should be kept in sync with + * the internal net-const.h */ +#define NMDB_CACHE_ONLY 1 +#define NMDB_SYNC 2 + + nmdb_t *nmdb_init(); int nmdb_add_tipc_server(nmdb_t *db, int port); int nmdb_add_tcp_server(nmdb_t *db, const char *addr, int port); diff --git a/nmdb/dbloop.c b/nmdb/dbloop.c index 998720d..e50901d 100644 --- a/nmdb/dbloop.c +++ b/nmdb/dbloop.c @@ -100,17 +100,17 @@ static void *db_loop(void *arg) static void process_op(db_t *db, struct queue_entry *e) { int rv; - if (e->operation == REQ_SET_SYNC) { + if (e->operation == REQ_SET) { rv = db_set(db, e->key, e->ksize, e->val, e->vsize); + if (!(e->req->flags & FLAGS_SYNC)) + return; + if (!rv) { e->req->reply_err(e->req, ERR_DB); return; } e->req->reply_mini(e->req, REP_OK); - } else if (e->operation == REQ_SET_ASYNC) { - db_set(db, e->key, e->ksize, e->val, e->vsize); - } else if (e->operation == REQ_GET) { unsigned char *val; size_t vsize = 64 * 1024; @@ -129,17 +129,17 @@ static void process_op(db_t *db, struct queue_entry *e) e->req->reply_long(e->req, REP_OK, val, vsize); free(val); - } else if (e->operation == REQ_DEL_SYNC) { + } else if (e->operation == REQ_DEL) { rv = db_del(db, e->key, e->ksize); + if (!(e->req->flags & FLAGS_SYNC)) + return; + if (rv == 0) { e->req->reply_mini(e->req, REP_NOTIN); return; } e->req->reply_mini(e->req, REP_OK); - } else if (e->operation == REQ_DEL_ASYNC) { - db_del(db, e->key, e->ksize); - } else if (e->operation == REQ_CAS) { unsigned char *dbval; size_t dbvsize = 64 * 1024; diff --git a/nmdb/net-const.h b/nmdb/net-const.h index 4d8ecd9..290ab90 100644 --- a/nmdb/net-const.h +++ b/nmdb/net-const.h @@ -27,19 +27,16 @@ #define PROTO_VER 1 /* Network requests */ -#define REQ_CACHE_GET 0x101 -#define REQ_CACHE_SET 0x102 -#define REQ_CACHE_DEL 0x103 -#define REQ_GET 0x104 -#define REQ_SET_SYNC 0x105 -#define REQ_DEL_SYNC 0x106 -#define REQ_SET_ASYNC 0x107 -#define REQ_DEL_ASYNC 0x108 -#define REQ_CACHE_CAS 0x109 -#define REQ_CAS 0x110 -#define REQ_CACHE_INCR 0x111 -#define REQ_INCR 0x112 -#define REQ_STATS 0x113 +#define REQ_GET 0x101 +#define REQ_SET 0x102 +#define REQ_DEL 0x103 +#define REQ_CAS 0x104 +#define REQ_INCR 0x105 +#define REQ_STATS 0x106 + +/* Possible request flags (which can be applied to the documented requests) */ +#define FLAGS_CACHE_ONLY 1 /* get, set, del, cas, incr */ +#define FLAGS_SYNC 2 /* set, del */ /* Network replies (different namespace from requests) */ #define REP_ERR 0x800 diff --git a/nmdb/parse.c b/nmdb/parse.c index 46c6877..e7c9b72 100644 --- a/nmdb/parse.c +++ b/nmdb/parse.c @@ -12,11 +12,11 @@ #include "common.h" -static void parse_get(struct req_info *req, int impact_db); -static void parse_set(struct req_info *req, int impact_db, int async); -static void parse_del(struct req_info *req, int impact_db, int async); -static void parse_cas(struct req_info *req, int impact_db); -static void parse_incr(struct req_info *req, int impact_db); +static void parse_get(struct req_info *req); +static void parse_set(struct req_info *req); +static void parse_del(struct req_info *req); +static void parse_cas(struct req_info *req); +static void parse_incr(struct req_info *req); static void parse_stats(struct req_info *req); @@ -120,13 +120,15 @@ static struct queue_entry *make_queue_entry(struct req_info *req, int parse_message(struct req_info *req, const unsigned char *buf, size_t len) { - uint32_t hdr, ver, id, cmd; + uint32_t hdr, ver, id; + uint16_t cmd, flags; const unsigned char *payload; size_t psize; /* The header is: * 4 bytes Version + ID - * 4 bytes Command + * 2 bytes Command + * 2 bytes Flags * Variable Payload */ @@ -138,7 +140,8 @@ int parse_message(struct req_info *req, id = hdr & 0x0FFFFFFF; req->id = id; - cmd = ntohl(* ((uint32_t *) buf + 1)); + cmd = ntohs(* ((uint16_t *) buf + 2)); + flags = ntohs(* ((uint16_t *) buf + 3)); if (ver != PROTO_VER) { stats.net_version_mismatch++; @@ -157,45 +160,20 @@ int parse_message(struct req_info *req, * to calculate it at send time. */ req->id = htonl(id); req->cmd = cmd; + req->flags = flags; req->payload = payload; req->psize = psize; - if (cmd == REQ_CACHE_GET) { - stats.cache_get++; - parse_get(req, 0); - } else if (cmd == REQ_CACHE_SET) { - stats.cache_set++; - parse_set(req, 0, 0); - } else if (cmd == REQ_CACHE_DEL) { - stats.cache_del++; - parse_del(req, 0, 0); - } else if (cmd == REQ_GET) { - stats.db_get++; - parse_get(req, 1); - } else if (cmd == REQ_SET_SYNC) { - stats.db_set++; - parse_set(req, 1, 0); - } else if (cmd == REQ_DEL_SYNC) { - stats.db_del++; - parse_del(req, 1, 0); - } else if (cmd == REQ_SET_ASYNC) { - stats.db_set++; - parse_set(req, 1, 1); - } else if (cmd == REQ_DEL_ASYNC) { - stats.db_del++; - parse_del(req, 1, 1); - } else if (cmd == REQ_CACHE_CAS) { - stats.cache_cas++; - parse_cas(req, 0); + if (cmd == REQ_GET) { + parse_get(req); + } else if (cmd == REQ_SET) { + parse_set(req); + } else if (cmd == REQ_DEL) { + parse_del(req); } else if (cmd == REQ_CAS) { - stats.db_cas++; - parse_cas(req, 1); - } else if (cmd == REQ_CACHE_INCR) { - stats.cache_incr++; - parse_incr(req, 0); + parse_cas(req); } else if (cmd == REQ_INCR) { - stats.db_incr++; - parse_incr(req, 1); + parse_incr(req); } else if (cmd == REQ_STATS) { parse_stats(req); } else { @@ -207,9 +185,23 @@ int parse_message(struct req_info *req, } -static void parse_get(struct req_info *req, int impact_db) +/* Small macros used to handle flags in the parse_*() functions */ +#define FILL_CACHE_FLAG(OP) \ + do { \ + cache_only = req->flags & FLAGS_CACHE_ONLY; \ + if (cache_only) stats.cache_##OP++; \ + else stats.db_##OP++; \ + } while (0) + +#define FILL_SYNC_FLAG() \ + do { \ + sync = req->flags & FLAGS_SYNC; \ + } while(0) + + +static void parse_get(struct req_info *req) { - int hit; + int hit, cache_only; const unsigned char *key; uint32_t ksize; unsigned char *val = NULL; @@ -223,15 +215,17 @@ static void parse_get(struct req_info *req, int impact_db) return; } + FILL_CACHE_FLAG(get); + key = req->payload + sizeof(uint32_t); hit = cache_get(cache_table, key, ksize, &val, &vsize); - if (!hit && !impact_db) { + if (cache_only && !hit) { stats.cache_misses++; req->reply_mini(req, REP_CACHE_MISS); return; - } else if (!hit && impact_db) { + } else if (!cache_only && !hit) { struct queue_entry *e; e = make_queue_entry(req, REQ_GET, key, ksize, NULL, 0); if (e == NULL) { @@ -250,10 +244,9 @@ static void parse_get(struct req_info *req, int impact_db) } } - -static void parse_set(struct req_info *req, int impact_db, int async) +static void parse_set(struct req_info *req) { - int rv; + int rv, cache_only, sync; const unsigned char *key, *val; uint32_t ksize, vsize; const int max = 65536; @@ -282,6 +275,9 @@ static void parse_set(struct req_info *req, int impact_db, int async) return; } + FILL_CACHE_FLAG(set); + FILL_SYNC_FLAG(); + key = req->payload + sizeof(uint32_t) * 2; val = key + ksize; @@ -291,15 +287,10 @@ static void parse_set(struct req_info *req, int impact_db, int async) return; } - if (impact_db) { + if (!cache_only) { struct queue_entry *e; - uint32_t request; - - request = REQ_SET_SYNC; - if (async) - request = REQ_SET_ASYNC; - e = make_queue_entry(req, request, key, ksize, val, vsize); + e = make_queue_entry(req, REQ_SET, key, ksize, val, vsize); if (e == NULL) { req->reply_err(req, ERR_MEM); return; @@ -308,16 +299,17 @@ static void parse_set(struct req_info *req, int impact_db, int async) queue_put(op_queue, e); queue_unlock(op_queue); - if (async) { - req->reply_mini(req, REP_OK); - } else { + if (sync) { /* Signal the DB thread it has work only if it's a * synchronous operation, asynchronous don't mind * waiting. It does have a measurable impact on * performance (2083847usec vs 2804973usec for sets on * "test2d 100000 10 10"). */ queue_signal(op_queue); + } else { + req->reply_mini(req, REP_OK); } + return; } else { req->reply_mini(req, REP_OK); @@ -327,9 +319,9 @@ static void parse_set(struct req_info *req, int impact_db, int async) } -static void parse_del(struct req_info *req, int impact_db, int async) +static void parse_del(struct req_info *req) { - int hit; + int hit, cache_only, sync; const unsigned char *key; uint32_t ksize; @@ -341,23 +333,21 @@ static void parse_del(struct req_info *req, int impact_db, int async) return; } + FILL_CACHE_FLAG(del); + FILL_SYNC_FLAG(); + key = req->payload + sizeof(uint32_t); hit = cache_del(cache_table, key, ksize); - if (!impact_db && hit) { + if (cache_only && hit) { req->reply_mini(req, REP_OK); - } else if (!impact_db && !hit) { + } else if (cache_only && !hit) { req->reply_mini(req, REP_NOTIN); - } else if (impact_db) { + } else if (!cache_only) { struct queue_entry *e; - uint32_t request; - request = REQ_DEL_SYNC; - if (async) - request = REQ_DEL_ASYNC; - - e = make_queue_entry(req, request, key, ksize, NULL, 0); + e = make_queue_entry(req, REQ_DEL, key, ksize, NULL, 0); if (e == NULL) { req->reply_err(req, ERR_MEM); return; @@ -366,11 +356,11 @@ static void parse_del(struct req_info *req, int impact_db, int async) queue_put(op_queue, e); queue_unlock(op_queue); - if (async) { - req->reply_mini(req, REP_OK); - } else { + if (sync) { /* See comment on parse_set(). */ queue_signal(op_queue); + } else { + req->reply_mini(req, REP_OK); } return; @@ -379,9 +369,9 @@ static void parse_del(struct req_info *req, int impact_db, int async) return; } -static void parse_cas(struct req_info *req, int impact_db) +static void parse_cas(struct req_info *req) { - int rv; + int rv, cache_only; const unsigned char *key, *oldval, *newval; uint32_t ksize, ovsize, nvsize; const int max = 65536; @@ -416,6 +406,8 @@ static void parse_cas(struct req_info *req, int impact_db) return; } + FILL_CACHE_FLAG(cas); + key = req->payload + sizeof(uint32_t) * 3; oldval = key + ksize; newval = oldval + ovsize; @@ -429,7 +421,7 @@ static void parse_cas(struct req_info *req, int impact_db) return; } - if (!impact_db) { + if (cache_only) { if (rv == -1) { req->reply_mini(req, REP_NOTIN); return; @@ -438,7 +430,7 @@ static void parse_cas(struct req_info *req, int impact_db) return; } } else { - /* impact_db = 1 and the key is either not in the cache, or + /* !cache_only and the key is either not in the cache, or * cache_cas() was successful. We now need to queue the CAS in * the database. */ struct queue_entry *e; @@ -508,9 +500,9 @@ static uint64_t htonll(uint64_t x) } -static void parse_incr(struct req_info *req, int impact_db) +static void parse_incr(struct req_info *req) { - int cres; + int cres, cache_only; const unsigned char *key; uint32_t ksize; int64_t increment; @@ -534,6 +526,8 @@ static void parse_incr(struct req_info *req, int impact_db) return; } + FILL_CACHE_FLAG(incr); + key = req->payload + sizeof(uint32_t); increment = ntohll( * (int64_t *) (key + ksize) ); @@ -547,7 +541,7 @@ static void parse_incr(struct req_info *req, int impact_db) return; } - if (impact_db) { + if (!cache_only) { struct queue_entry *e; /* at this point, the cache_incr() was either successful or a diff --git a/nmdb/req.h b/nmdb/req.h index 4637307..21ac384 100644 --- a/nmdb/req.h +++ b/nmdb/req.h @@ -25,6 +25,7 @@ struct req_info { /* operation information */ uint32_t id; uint32_t cmd; + uint16_t flags; const unsigned char *payload; size_t psize;