author | Alberto Bertogli
<albertito@blitiri.com.ar> 2010-04-26 06:46:58 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2010-04-26 19:04:22 UTC |
parent | ba6c6296958c9e4dc89bed26bf7d83f184a30212 |
bindings/python/nmdb.py | +29 | -0 |
bindings/python/nmdb_ll.c | +74 | -0 |
libnmdb/doxygen/Doxyfile.public | +1 | -1 |
libnmdb/doxygen/groups.doxy | +0 | -12 |
libnmdb/libnmdb.c | +110 | -0 |
libnmdb/nmdb.h | +72 | -0 |
nmdb/be-bdb.c | +2 | -0 |
nmdb/be-null.c | +2 | -0 |
nmdb/be-qdbm.c | +2 | -0 |
nmdb/be-tc.c | +2 | -0 |
nmdb/be-tdb.c | +39 | -0 |
nmdb/be.h | +4 | -1 |
nmdb/dbloop.c | +45 | -0 |
nmdb/net-const.h | +2 | -0 |
nmdb/parse.c | +51 | -0 |
nmdb/stats.c | +3 | -0 |
nmdb/stats.h | +3 | -1 |
utils/nmdb-stats.c | +5 | -1 |
diff --git a/bindings/python/nmdb.py b/bindings/python/nmdb.py index e185d55..b5b73c4 100644 --- a/bindings/python/nmdb.py +++ b/bindings/python/nmdb.py @@ -197,6 +197,31 @@ class GenericDB (object): def normal_incr(self, key, increment = 1): return self.generic_incr(self._db.incr, key, increment) + def firstkey(self): + try: + r = self._db.firstkey() + except: + raise NetworkError + if r == -1: + # No keys, or unsupported + raise KeyError + if self.autopickle: + r = pickle.loads(r) + return r + + def nextkey(self, key): + if self.autopickle: + key = pickle.dumps(key, protocol = -1) + try: + r = self._db.nextkey(key) + except: + raise NetworkError + if r == -1: + # No next key, or unsupported + raise KeyError + if self.autopickle: + r = pickle.loads(r) + return r # The following functions will assume the existance of self.set, # self.get, and self.delete, which are supposed to be set by our @@ -240,6 +265,8 @@ class DB (GenericDB): delete = GenericDB.normal_delete cas = GenericDB.normal_cas incr = GenericDB.normal_incr + firstkey = GenericDB.firstkey + nextkey = GenericDB.nextkey class SyncDB (GenericDB): get = GenericDB.normal_get @@ -247,5 +274,7 @@ class SyncDB (GenericDB): delete = GenericDB.delete_sync cas = GenericDB.normal_cas incr = GenericDB.normal_incr + firstkey = GenericDB.firstkey + nextkey = GenericDB.nextkey diff --git a/bindings/python/nmdb_ll.c b/bindings/python/nmdb_ll.c index c4bdd91..9b51bbb 100644 --- a/bindings/python/nmdb_ll.c +++ b/bindings/python/nmdb_ll.c @@ -355,6 +355,78 @@ static PyObject *db_delete_sync(nmdbobject *db, PyObject *args) return PyLong_FromLong(rv); } +/* firstkey */ +static PyObject *db_firstkey(nmdbobject *db, PyObject *args) +{ + unsigned char *key; + int ksize; + long rv; + PyObject *r; + + if (!PyArg_ParseTuple(args, "")) { + return NULL; + } + + /* ksize is enough to hold the any value */ + ksize = 128 * 1024; + key = malloc(ksize); + if (key== NULL) + return PyErr_NoMemory(); + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_firstkey(db->db, key, ksize); + Py_END_ALLOW_THREADS + + if (rv <= -2) { + /* FIXME: define a better exception */ + r = PyErr_SetFromErrno(PyExc_IOError); + } else if (rv == -1) { + /* No first key or unsupported, handled in the high-level + * module. */ + r = PyLong_FromLong(-1); + } else { + r = PyString_FromStringAndSize((char *) key, rv); + } + + free(key); + return r; +} + +/* nextkey */ +static PyObject *db_nextkey(nmdbobject *db, PyObject *args) +{ + unsigned char *key, *newkey; + int ksize, nksize; + long rv; + PyObject *r; + + if (!PyArg_ParseTuple(args, "s#:nextkey", &key, &ksize)) { + return NULL; + } + + /* nksize is enough to hold the any value */ + nksize = 128 * 1024; + newkey = malloc(nksize); + if (newkey == NULL) + return PyErr_NoMemory(); + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_nextkey(db->db, key, ksize, newkey, nksize); + Py_END_ALLOW_THREADS + + if (rv <= -2) { + /* FIXME: define a better exception */ + r = PyErr_SetFromErrno(PyExc_IOError); + } else if (rv == -1) { + /* End, handled in the high-level module. */ + r = PyLong_FromLong(-1); + } else { + r = PyString_FromStringAndSize((char *) newkey, rv); + } + + free(newkey); + return r; +} /* nmdb method table */ @@ -378,6 +450,8 @@ static PyMethodDef nmdb_methods[] = { { "incr", (PyCFunction) db_incr, METH_VARARGS, NULL }, { "set_sync", (PyCFunction) db_set_sync, METH_VARARGS, NULL }, { "delete_sync", (PyCFunction) db_delete_sync, METH_VARARGS, NULL }, + { "firstkey", (PyCFunction) db_firstkey, METH_VARARGS, NULL }, + { "nextkey", (PyCFunction) db_nextkey, METH_VARARGS, NULL }, { NULL } }; diff --git a/libnmdb/doxygen/Doxyfile.public b/libnmdb/doxygen/Doxyfile.public index efc5aa7..9c9c209 100644 --- a/libnmdb/doxygen/Doxyfile.public +++ b/libnmdb/doxygen/Doxyfile.public @@ -4,5 +4,5 @@ OUTPUT_DIRECTORY = doc.public EXTRACT_ALL = YES EXTRACT_STATIC = NO INTERNAL_DOCS = NO -INPUT = ../nmdb.h groups.doxy +INPUT = ../nmdb.h diff --git a/libnmdb/doxygen/groups.doxy b/libnmdb/doxygen/groups.doxy deleted file mode 100644 index 9e1038f..0000000 --- a/libnmdb/doxygen/groups.doxy +++ /dev/null @@ -1,12 +0,0 @@ - -/** -@addtogroup connection Connection API -Functions used to connect to the servers. - -@addtogroup database Database API -Functions that affect the database and the cache. - -@addtogroup cache Cache API -Functions that affect only the cache. -*/ - diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c index 8f975de..3ad8e0c 100644 --- a/libnmdb/libnmdb.c +++ b/libnmdb/libnmdb.c @@ -657,6 +657,116 @@ int nmdb_cache_incr(nmdb_t *db, const unsigned char *key, size_t ksize, } +ssize_t nmdb_firstkey(nmdb_t *db, unsigned char *key, size_t ksize) +{ + ssize_t rv, t; + unsigned char *buf, *p; + size_t bufsize, payload_offset, psize = 0; + uint32_t reply; + struct nmdb_srv *srv; + + if (db->nservers != 1) { + return -2; + } + srv = &(db->servers[0]); + + buf = new_packet(srv, REQ_FIRSTKEY, 0, &bufsize, &payload_offset, -1); + if (buf == NULL) + return -2; + + t = srv_send(srv, buf, payload_offset); + if (t <= 0) { + rv = -2; + goto exit; + } + + reply = get_rep(srv, buf, bufsize, &p, &psize); + + if (reply == REP_NOTIN) { + rv = -1; + goto exit; + } else if (reply == REP_ERR) { + rv = -2; + goto exit; + } else if (reply != REP_OK) { + /* invalid response */ + rv = -2; + goto exit; + } + + /* we've got an answer */ + rv = * (uint32_t *) p; + rv = ntohl(rv); + if (rv > (psize - 4) || rv > ksize) { + /* the value is too big for the packet size, or it is too big + * to fit in the buffer we were given */ + rv = -2; + goto exit; + } + memcpy(key, p + 4, rv); + +exit: + free(buf); + return rv; + +} + +ssize_t nmdb_nextkey(nmdb_t *db, const unsigned char *key, size_t ksize, + unsigned char *newkey, size_t nksize) +{ + ssize_t rv, t; + unsigned char *buf, *p; + size_t bufsize, reqsize, payload_offset, psize = 0; + uint32_t reply; + struct nmdb_srv *srv; + + if (db->nservers != 1) + return -2; + srv = &(db->servers[0]); + + buf = new_packet(srv, REQ_NEXTKEY, 0, &bufsize, &payload_offset, -1); + if (buf == NULL) + return -1; + reqsize = payload_offset; + reqsize += append_1v(buf + payload_offset, key, ksize); + + t = srv_send(srv, buf, reqsize); + if (t <= 0) { + rv = -2; + goto exit; + } + + reply = get_rep(srv, buf, bufsize, &p, &psize); + + if (reply == REP_NOTIN) { + rv = -1; + goto exit; + } else if (reply == REP_ERR) { + rv = -2; + goto exit; + } else if (reply != REP_OK) { + /* invalid response */ + rv = -2; + goto exit; + } + + /* we've got an answer */ + rv = * (uint32_t *) p; + rv = ntohl(rv); + if (rv > (psize - 4) || rv > nksize) { + /* the value is too big for the packet size, or it is too big + * to fit in the buffer we were given */ + rv = -2; + goto exit; + } + memcpy(newkey, p + 4, rv); + +exit: + free(buf); + return rv; +} + + /* Request servers' statistics, return the aggregated results in buf, with the * number of servers in nservers and the number of stats per server in nstats. * Used in the "nmdb-stats" utility, matches the server version. diff --git a/libnmdb/nmdb.h b/libnmdb/nmdb.h index 9159d82..cf18485 100644 --- a/libnmdb/nmdb.h +++ b/libnmdb/nmdb.h @@ -4,9 +4,16 @@ #ifndef _NMDB_H #define _NMDB_H + /** Opaque type representing a connection with one or more servers. */ typedef struct nmdb_conn nmdb_t; + +/** + * @addtogroup connection Connection API + * Functions used to connect to the servers. + */ + /** Create a new nmdb_t pointer, used to talk with the server. * * @returns a new connection instance. @@ -62,6 +69,15 @@ int nmdb_add_sctp_server(nmdb_t *db, const char *addr, int port); */ int nmdb_free(nmdb_t *db); + +/** + * @addtogroup database Database API + * Functions that affect the database and the cache. + * + * @addtogroup cache Cache API + * Functions that affect only the cache. + */ + /** Get the value associated with a key. * * @param db connection instance. @@ -261,6 +277,61 @@ int nmdb_cache_incr(nmdb_t *db, const unsigned char *key, size_t ksize, int64_t increment, int64_t *newval); +/** + * @addtogroup utility Functions used in nmdb utilities + * These functions are used almost exclusively by nmdb utilities, although + * they may be used by external applications. They often require some + * knowledge about nmdb's inner workings so they should be used with care. + */ + +/** Get the first key. + * Returns the first key in the database, which can then be used to get the + * following one with nmdb_nextkey(). Together, they can be used to iterate + * over all the keys of a *single server*. It has some caveats: + * + * - It will fail if db has more than one server. + * - If the database is being modified during iteration, the walk can result + * in skipping nodes or walking the same one twice. + * - There is absolutely no guarantee about the order of the keys. + * - The order is not stable and must not be relied upon. + * + * This is almost exclusively used for replication utilities. + * + * @param db connection instance. + * @param[out] key the first key. + * @param ksize the key size. + * @returns -2 on error, -1 if the database is empty, or the key size on + * success. + * @ingroup utility + */ +ssize_t nmdb_firstkey(nmdb_t *db, unsigned char *key, size_t ksize); + +/** Get the key that follows the one given. + * Together with nmdb_firstkey(), they can be used to iterate This function, + * along with nmdb_firstkey(), are used to iterate over all the keys of a + * *single server*. It has some caveats: + * + * - It will fail if db has more than one server. + * - If the database is being modified during iteration, the walk can result + * in skipping nodes or walking the same one twice. + * - There is absolutely no guarantee about the order of the keys. + * - The order is not stable and must not be relied upon. + * + * This is almost exclusively used for replication utilities. + * + * @param db connection instance. + * @param key the current key. + * @param ksize the current key size. + * @param[out] newkey the key that follows the current one. + * @param nksize the newkey size. + * @returns -2 on error, -1 if the database is empty, or the new key size on + * success. + * @ingroup utility + */ +ssize_t nmdb_nextkey(nmdb_t *db, const unsigned char *key, size_t ksize, + unsigned char *newkey, size_t nksize); + + /** Request servers' statistics. * This API is used by nmdb-stats, and likely to change in the future. Do not * rely on it. @@ -274,6 +345,7 @@ int nmdb_cache_incr(nmdb_t *db, const unsigned char *key, size_t ksize, * was a network error, -3 if the buffer was too small, -4 if the server * replies were of different size (indicates different server versions, * not supported at the time) + * @ingroup utility */ int nmdb_stats(nmdb_t *db, unsigned char *buf, size_t bsize, unsigned int *nservers, unsigned int *nstats); diff --git a/nmdb/be-bdb.c b/nmdb/be-bdb.c index edfd0d1..55abd26 100644 --- a/nmdb/be-bdb.c +++ b/nmdb/be-bdb.c @@ -47,6 +47,8 @@ struct db_conn *bdb_open(const char *name, int flags) db->set = bdb_set; db->get = bdb_get; db->del = bdb_del; + db->firstkey = NULL; + db->nextkey = NULL; db->close = bdb_close; return db; diff --git a/nmdb/be-null.c b/nmdb/be-null.c index 79d67b6..7b8d85e 100644 --- a/nmdb/be-null.c +++ b/nmdb/be-null.c @@ -26,6 +26,8 @@ struct db_conn *null_open(const char *name, int flags) db->set = null_set; db->get = null_get; db->del = null_del; + db->firstkey = NULL; + db->nextkey = NULL; db->close = null_close; return db; diff --git a/nmdb/be-qdbm.c b/nmdb/be-qdbm.c index 409fecf..4a1291e 100644 --- a/nmdb/be-qdbm.c +++ b/nmdb/be-qdbm.c @@ -36,6 +36,8 @@ struct db_conn *qdbm_open(const char *name, int flags) db->set = qdbm_set; db->get = qdbm_get; db->del = qdbm_del; + db->firstkey = NULL; + db->nextkey = NULL; db->close = qdbm_close; return db; diff --git a/nmdb/be-tc.c b/nmdb/be-tc.c index 2b45057..319a844 100644 --- a/nmdb/be-tc.c +++ b/nmdb/be-tc.c @@ -34,6 +34,8 @@ struct db_conn *tc_open(const char *name, int flags) db->set = tc_set; db->get = tc_get; db->del = tc_del; + db->firstkey = NULL; + db->nextkey = NULL; db->close = tc_close; return db; diff --git a/nmdb/be-tdb.c b/nmdb/be-tdb.c index bd81789..0c3fca1 100644 --- a/nmdb/be-tdb.c +++ b/nmdb/be-tdb.c @@ -21,6 +21,10 @@ int xtdb_set(struct db_conn *db, const unsigned char *key, size_t ksize, int xtdb_get(struct db_conn *db, const unsigned char *key, size_t ksize, unsigned char *val, size_t *vsize); int xtdb_del(struct db_conn *db, const unsigned char *key, size_t ksize); +int xtdb_firstkey(struct db_conn *db, unsigned char *key, size_t *ksize); +int xtdb_nextkey(struct db_conn *db, + const unsigned char *key, size_t ksize, + unsigned char *nextkey, size_t *nksize); int xtdb_close(struct db_conn *db); @@ -43,6 +47,8 @@ struct db_conn *xtdb_open(const char *name, int flags) db->set = xtdb_set; db->get = xtdb_get; db->del = xtdb_del; + db->firstkey = xtdb_firstkey; + db->nextkey = xtdb_nextkey; db->close = xtdb_close; return db; @@ -103,6 +109,39 @@ int xtdb_del(struct db_conn *db, const unsigned char *key, size_t ksize) return tdb_delete(db->conn, k) == 0; } +int xtdb_firstkey(struct db_conn *db, unsigned char *key, size_t *ksize) +{ + TDB_DATA k; + + k = tdb_firstkey(db->conn); + if (k.dptr == NULL) + return 0; + + *ksize = k.dsize; + memcpy(key, k.dptr, k.dsize); + free(k.dptr); + return 1; +} + +int xtdb_nextkey(struct db_conn *db, + const unsigned char *key, size_t ksize, + unsigned char *nextkey, size_t *nksize) +{ + TDB_DATA pk, nk; + + pk.dptr = (unsigned char *) key; + pk.dsize = ksize; + + nk = tdb_nextkey(db->conn, pk); + if (nk.dptr == NULL) + return 0; + + *nksize = nk.dsize; + memcpy(nextkey, nk.dptr, nk.dsize); + free(nk.dptr); + return 1; +} + #else #include <stddef.h> /* NULL */ diff --git a/nmdb/be.h b/nmdb/be.h index 7f9e3a1..8c40fa5 100644 --- a/nmdb/be.h +++ b/nmdb/be.h @@ -15,8 +15,11 @@ struct db_conn { int (*get)(struct db_conn *db, const unsigned char *key, size_t ksize, unsigned char *val, size_t *vsize); int (*del)(struct db_conn *db, const unsigned char *key, size_t ksize); + int (*firstkey)(struct db_conn *db, unsigned char *key, size_t *ksize); + int (*nextkey)(struct db_conn *db, + const unsigned char *key, size_t ksize, + unsigned char *nextkey, size_t *nksize); int (*close)(struct db_conn *db); - }; enum backend_type { diff --git a/nmdb/dbloop.c b/nmdb/dbloop.c index 4fe75b7..7a4f4fd 100644 --- a/nmdb/dbloop.c +++ b/nmdb/dbloop.c @@ -228,6 +228,51 @@ static void process_op(struct db_conn *db, struct queue_entry *e) free(dbval); + } else if (e->operation == REQ_FIRSTKEY) { + unsigned char *key; + size_t ksize = 64 * 1024; + + if (db->firstkey == NULL) { + e->req->reply_err(e->req, ERR_DB); + return; + } + + key = malloc(ksize); + if (key == NULL) { + e->req->reply_err(e->req, ERR_MEM); + return; + } + rv = db->firstkey(db, key, &ksize); + if (rv == 0) { + e->req->reply_mini(e->req, REP_NOTIN); + free(key); + return; + } + e->req->reply_long(e->req, REP_OK, key, ksize); + free(key); + + } else if (e->operation == REQ_NEXTKEY) { + unsigned char *newkey; + size_t nksize = 64 * 1024; + + if (db->nextkey == NULL) { + e->req->reply_err(e->req, ERR_DB); + return; + } + + newkey = malloc(nksize); + if (newkey == NULL) { + e->req->reply_err(e->req, ERR_MEM); + return; + } + rv = db->nextkey(db, e->key, e->ksize, newkey, &nksize); + if (rv == 0) { + e->req->reply_mini(e->req, REP_NOTIN); + free(newkey); + return; + } + e->req->reply_long(e->req, REP_OK, newkey, nksize); + free(newkey); } else { wlog("Unknown op 0x%x\n", e->operation); } diff --git a/nmdb/net-const.h b/nmdb/net-const.h index 69aeddb..3fb80bc 100644 --- a/nmdb/net-const.h +++ b/nmdb/net-const.h @@ -33,6 +33,8 @@ #define REQ_CAS 0x104 #define REQ_INCR 0x105 #define REQ_STATS 0x106 +#define REQ_FIRSTKEY 0x107 +#define REQ_NEXTKEY 0x108 /* Possible request flags (which can be applied to the documented requests) */ #define FLAGS_CACHE_ONLY 1 /* get, set, del, cas, incr */ diff --git a/nmdb/parse.c b/nmdb/parse.c index f8fc7d8..3456426 100644 --- a/nmdb/parse.c +++ b/nmdb/parse.c @@ -17,6 +17,8 @@ static void parse_set(struct req_info *req); static void parse_del(struct req_info *req); static void parse_cas(struct req_info *req); static void parse_incr(struct req_info *req); +static void parse_firstkey(struct req_info *req); +static void parse_nextkey(struct req_info *req); static void parse_stats(struct req_info *req); @@ -204,6 +206,10 @@ int parse_message(struct req_info *req, parse_cas(req); } else if (cmd == REQ_INCR) { parse_incr(req); + } else if (cmd == REQ_FIRSTKEY) { + parse_firstkey(req); + } else if (cmd == REQ_NEXTKEY) { + parse_nextkey(req); } else if (cmd == REQ_STATS) { parse_stats(req); } else { @@ -539,6 +545,48 @@ static void parse_incr(struct req_info *req) } +static void parse_firstkey(struct req_info *req) +{ + int rv; + const unsigned char *key; + + stats.db_firstkey++; + + key = req->payload + sizeof(uint32_t); + + rv = put_in_queue(req, REQ_FIRSTKEY, 1, NULL, 0, NULL, 0); + if (!rv) { + req->reply_err(req, ERR_MEM); + return; + } +} + +static void parse_nextkey(struct req_info *req) +{ + int rv; + const unsigned char *key; + uint32_t ksize; + + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + if (req->psize < ksize) { + stats.net_broken_req++; + req->reply_err(req, ERR_BROKEN); + return; + } + + stats.db_nextkey++; + + key = req->payload + sizeof(uint32_t); + + rv = put_in_queue(req, REQ_NEXTKEY, 1, key, ksize, NULL, 0); + if (!rv) { + req->reply_err(req, ERR_MEM); + return; + } +} + + static void parse_stats(struct req_info *req) { int i; @@ -583,6 +631,9 @@ static void parse_stats(struct req_info *req) fcpy(net_broken_req); fcpy(net_unk_req); + fcpy(db_firstkey); + fcpy(db_nextkey); + req->reply_long(req, REP_OK, (unsigned char *) response, sizeof(response)); diff --git a/nmdb/stats.c b/nmdb/stats.c index 8cd85ee..fed845b 100644 --- a/nmdb/stats.c +++ b/nmdb/stats.c @@ -26,6 +26,9 @@ void stats_init(struct stats *s) s->net_version_mismatch = 0; s->net_broken_req = 0; s->net_unk_req = 0; + + s->db_firstkey = 0; + s->db_nextkey = 0; } diff --git a/nmdb/stats.h b/nmdb/stats.h index 8d85ff6..92d53d4 100644 --- a/nmdb/stats.h +++ b/nmdb/stats.h @@ -31,9 +31,11 @@ struct stats { unsigned long net_version_mismatch; unsigned long net_broken_req; /* 20 */ unsigned long net_unk_req; + unsigned long db_firstkey; + unsigned long db_nextkey; }; -#define STATS_REPLY_SIZE 21 +#define STATS_REPLY_SIZE 23 void stats_init(struct stats *s); diff --git a/utils/nmdb-stats.c b/utils/nmdb-stats.c index d9f1647..be5733e 100644 --- a/utils/nmdb-stats.c +++ b/utils/nmdb-stats.c @@ -8,6 +8,7 @@ #include <stdint.h> /* uint64_t */ #include <string.h> /* strcmp() */ #include <stdlib.h> /* atoi() */ +#include <arpa/inet.h> /* htonl() and friends */ #include "nmdb.h" @@ -108,6 +109,7 @@ int main(int argc, char **argv) j = nstats * i; + /* note they are not necessarily in numerical order */ shst("cache get", 0); shst("cache set", 1); shst("cache del", 2); @@ -119,6 +121,8 @@ int main(int argc, char **argv) shst("db del", 7); shst("db cas", 8); shst("db incr", 9); + shst("db firstkey", 21); + shst("db nextkey", 22); shst("cache hits", 10); shst("cache misses", 11); @@ -136,7 +140,7 @@ int main(int argc, char **argv) shst("unknown requests", 20); /* if there are any fields we don't know, show them anyway */ - for (k = 21; k < nstats; k++) { + for (k = 23; k < nstats; k++) { shst("unknown field", k); }