author | Alberto Bertogli
<albertito@gmail.com> 2006-09-11 05:17:57 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2006-09-11 05:17:57 UTC |
LICENSE | +13 | -0 |
README | +49 | -0 |
TODO | +15 | -0 |
libnmdb/LICENSE | +34 | -0 |
libnmdb/Makefile | +70 | -0 |
libnmdb/libnmdb.3 | +114 | -0 |
libnmdb/libnmdb.c | +344 | -0 |
libnmdb/net-const.h | +43 | -0 |
libnmdb/nmdb.h | +35 | -0 |
libnmdb/test1c.c | +84 | -0 |
libnmdb/test1d.c | +84 | -0 |
libnmdb/test2c.c | +92 | -0 |
libnmdb/test2d.c | +92 | -0 |
libnmdb/timer.h | +39 | -0 |
nmdb/LICENSE | +184 | -0 |
nmdb/Makefile | +40 | -0 |
nmdb/be-qdbm.c | +48 | -0 |
nmdb/be.h | +21 | -0 |
nmdb/cache.c | +308 | -0 |
nmdb/cache.h | +49 | -0 |
nmdb/common.h | +31 | -0 |
nmdb/db.c | +125 | -0 |
nmdb/db.h | +12 | -0 |
nmdb/main.c | +155 | -0 |
nmdb/net-const.h | +43 | -0 |
nmdb/net.c | +47 | -0 |
nmdb/net.h | +8 | -0 |
nmdb/nmdb.1 | +86 | -0 |
nmdb/queue.c | +119 | -0 |
nmdb/queue.h | +43 | -0 |
nmdb/tipc.c | +490 | -0 |
nmdb/tipc.h | +34 | -0 |
python/LICENSE | +34 | -0 |
python/nmdb.py | +73 | -0 |
python/nmdb_ll.c | +302 | -0 |
python/setup.py | +17 | -0 |
diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..661eb2b --- /dev/null +++ b/LICENSE @@ -0,0 +1,13 @@ + +Each sub-project in nmdb source tree has an independant license, and can be +treated as independant projects from a licensing point of view. + +Licenses are always in a file named "LICENSE" in the top directory of each +sub-project, and apply to ALL the source code files in that directory tree (it +includes subdirectories). + +As a brief resume, here's how each sub-project is licensed: + * nmdb: OSL 3.0 + * lib: BOLA (Public domain) + * python: BOLA (Public domain) + diff --git a/README b/README new file mode 100644 index 0000000..31d28f4 --- /dev/null +++ b/README @@ -0,0 +1,49 @@ + +nmdb - A TIPC-based database manager +Alberto Bertogli (albertito@gmail.com) +----------------------------------------- + +nmdb is a network database that uses the TIPC protocol to communicate with +it's clients. + +It consists of an in-memory cache, that saves (key, value) pairs, and a +persistant backend that stores the pairs on disk. + +Both work combined, but the use of the persistant backend is optional, so you +can use the server only for cache queries, pretty much like memcached. + +This source distribution is composed of several parts: the server called +"nmdb", the library and the python bindings. Each one has a separate top level +directory, and is licensed individually. See the LICENSE file for more +information. + +For additional documentation and resources, go to the project's website at +http://auriga.wearlab.de/~alb/nmdb. + + +How to compile, test and install +-------------------------------- + +You will need to be running a kernel >= 2.6.16, and to have kernel headers +also >= 2.6.16. Alternatively, you can run a TIPC patched kernel, and have the +tipc.h header be put somewhere in the include path by hand. + +To compile the server and the library, you can just use "make" on their +directories. To install them, use "make install". + +To test the application, start the server and then go to the library +directory. Run "make tests", and then use the "test1c" and "test2c" to test +the cache layer, and "test1d" and "test2d" to test the database backend. + +To compile the Python bindings, you need to have the library already +installed. Go to the directory and run "python setup.py install", which will +build and install the modules. The module will be named "nmdb". + + +Where to report bugs +-------------------- + +Please report any bugs, suggestions, issues or comments to me, Alberto +Bertogli, at albertito@gmail.com. + + diff --git a/TODO b/TODO new file mode 100644 index 0000000..d7be06f --- /dev/null +++ b/TODO @@ -0,0 +1,15 @@ + +Server: +* Passive mode +* Export stats +* Test it's endian-clean (it should work fine) + +Library: +* Update documentation +* Support multithreading and/with non-blocking API +* Automatic server selection + +Python: +* Write test-cases to test the binding, the library and the server +* Document the module + diff --git a/libnmdb/LICENSE b/libnmdb/LICENSE new file mode 100644 index 0000000..e92bf0d --- /dev/null +++ b/libnmdb/LICENSE @@ -0,0 +1,34 @@ + +I don't like licenses, because I don't like having to worry about all this +legal stuff just for a simple piece of software I don't really mind anyone +using. But I also believe that it's important that people share and give back; +so I'm placing this library under the following license, so you feel guilty if +you don't ;) + + +BOLA - Buena Onda License Agreement +----------------------------------- + +This work is provided 'as-is', without any express or implied warranty. In no +event will the authors be held liable for any damages arising from the use of +this work. + +To all effects and purposes, this work is to be considered Public Domain. + + +However, if you want to be "Buena onda", you should: + +1. Not take credit for it, and give proper recognition to the authors. +2. Share your modifications, so everybody benefits from them. +4. Do something nice for the authors. +5. Help someone who needs it: sign up for some volunteer work or help your + neighbour paint the house. +6. Don't waste. Anything, but specially energy that comes from natural + non-renovable resources. Extra points if you discover or invent something + to replace them. +7. Be tolerant. Everything that's good in nature comes from cooperation. + +The order is important, and the further you go the more "Buena onda" you are. +Make the world a better place: be "Buena onda". + + diff --git a/libnmdb/Makefile b/libnmdb/Makefile new file mode 100644 index 0000000..5007826 --- /dev/null +++ b/libnmdb/Makefile @@ -0,0 +1,70 @@ + +CFLAGS += -std=c99 -Wall -D_XOPEN_SOURCE=500 -O3 -fPIC + +ifdef DEBUG +CFLAGS += -g -pg -fprofile-arcs -ftest-coverage +endif + +ifdef STRICT +CFLAGS += -ansi -pedantic +endif + +# prefix for installing the binaries +PREFIX=/usr/local + + +OBJS = libnmdb.o test1c.o test1d.o test2c.o test2d.o + + +default: all + +all: libs + + +libs: libnmdb.so libnmdb.a + +libnmdb.so: libnmdb.o + $(CC) $(CFLAGS) libnmdb.o -shared -fPIC -o libnmdb.so + +libnmdb.a: libnmdb.o + $(AR) cr libnmdb.a libnmdb.o + + +tests: test1c test1d test2c test2d + +test1c: test1c.o libnmdb.a + $(CC) $(CFLAGS) test1c.o libnmdb.a -o test1c + +test1d: test1d.o libnmdb.a + $(CC) $(CFLAGS) test1d.o libnmdb.a -o test1d + +test2c: test2c.o libnmdb.a + $(CC) $(CFLAGS) test2c.o libnmdb.a -o test2c + +test2d: test2d.o libnmdb.a + $(CC) $(CFLAGS) test2d.o libnmdb.a -o test2d + + +install: libs + install -d $(PREFIX)/lib + install -m 0755 libnmdb.so $(PREFIX)/lib + install -m 0755 libnmdb.a $(PREFIX)/lib + install -d $(PREFIX)/include + install -m 0644 nmdb.h $(PREFIX)/include + install -d $(PREFIX)/man/man3 + install -m 0644 libnmdb.3 $(PREFIX)/man/man3/ + @echo + @echo "Please run ldconfig to update your library cache" + @echo + + +.c.o: + $(CC) $(CFLAGS) -c $< -o $@ + +clean: + rm -f $(OBJS) libnmdb.so libnmdb.a test1c test1d test2c test2d + rm -f *.bb *.bbg *.da *.gcov gmon.out + +.PHONY: default all libs tests install clean + + diff --git a/libnmdb/libnmdb.3 b/libnmdb/libnmdb.3 new file mode 100644 index 0000000..756940c --- /dev/null +++ b/libnmdb/libnmdb.3 @@ -0,0 +1,114 @@ +.TH libnmdb 3 "11/Sep/2006" +.SH NAME +libnmdb - Library for interacting with a nmdb server +.SH SYNOPSYS +.nf +.B #include <nmdb.h> +.sp +.BI "nmdb_t *nmdb_init(int " port ");" +.sp +.BI "int nmdb_free(nmdb_t *" db ");" +.sp +.BI "int nmdb_set(nmdb_t *" db "," +.BI " const unsigned char *" key ", size_t " ksize "," +.BI " const unsigned char *" val ", size_t " vsize ");" +.BI "int nmdb_set_async(nmdb_t *" db "," +.BI " const unsigned char *" key ", size_t " ksize "," +.BI " const unsigned char *" val ", size_t " vsize ");" +.BI "int nmdb_cache_set(nmdb_t *" db "," +.BI " const unsigned char *" key ", size_t " ksize "," +.BI " const unsigned char *" val ", size_t " vsize ");" +.sp +.BI "int nmdb_get(nmdb_t *" db "," +.BI " const unsigned char *" key ", size_t " ksize "," +.BI " unsigned char *" val ", size_t " vsize ");" +.BI "int nmdb_cache_get(nmdb_t *" db "," +.BI " const unsigned char *" key ", size_t " ksize "," +.BI " unsigned char *" val ", size_t " vsize ");" +.sp +.BI "int nmdb_del(nmdb_t *" db "," +.BI " const unsigned char *" key ", size_t " ksize ");" +.BI "int nmdb_del_async(nmdb_t *" db "," +.BI " const unsigned char *" key ", size_t " ksize ");" +.BI "int nmdb_cache_del(nmdb_t *" db "," +.BI " const unsigned char *" key ", size_t " ksize ");" +.fi +.SH DESCRIPTION + +libnmdb is a library for interacting with a nmdb(1) server. For more +information about it, see nmdb(1) or http://auriga.wearlab.de/~alb/nmdb. + +The first step to access a server is to call +.BR nmdb_init () +to initialize the data. It will return an opaque pointer of +.B nmdb_t +type, which well be used as the first argument for the rest of the functions +and identifies a connection to the server. It takes a single argument, which +is the TIPC port to use. If you are in doubt, use -1 which will use the +default value. + +To dispose a connection, use +.BR nmdb_free (). + +The functions often take +.I key +and +.I ksize +arguments, as well as +.I val +and +.IR vsize . +These represent a key and a value, respectively, along with their +corresponding sizes, in bytes. The following restrictions regarding the size +of the keys and values apply: keys can't exceed 64Kb, values can't exceed +64Kb, and the size of a key + the size of it's associated value can't exceed +64Kb. + +There are three kinds of operations: +.IR set , +.I get +and +.IR del , +with their obvious meaning. Each operation has variants, that make it behave +in a different way. All three have "cache" variants, that only affect the +cache and not the database; and +.I set +and +.I del +have "async" variants that make the operation return inmediately, leaving it +queued on the server for asynchronous execution. Note that in this case no +message is sent to the client when the operation completes. All variants +behave the same way as the original operation unless noted, so the following +descriptions document them implicitly. + +.BR nmdb_set () +is used to set the value associated with the given key. It returns 1 on +success, or < 0 on failure. + +.BR nmdb_get () +is used to retrieve the value for the given key, if there is any. +The +.I val +parameter should be pointing to a buffer where the value will be placed, and +.I vsize +will be the size of that buffer. It's highly recommended that the buffer is +greater than 64kb in size to make room for the largest possible value. It will +return the size of the retrieved key (which will be put in the buffer pointed +at by +.IR val ), +0 if the requested key was not in the database (or cache, if the cache variant +is used), or < 0 on failure. + +.BR nmdb_cache_del () +is used to remove a given key (and it's associated value). It returns 1 if it +was removed successfuly, 0 if the key was not in the database/cache, or < 0 on +failure. In the asynchronous variant, success is always indicated by 1, and no +distinction is made if the key was not in the database/cache. + +.SH SEE ALSO + +.BR nmdb (1), +.B TIPC +(http://tipc.sf.net). +.SH AUTHORS +Created by Alberto Bertogli (albertito@gmail.com). diff --git a/libnmdb/libnmdb.c b/libnmdb/libnmdb.c new file mode 100644 index 0000000..01f13b3 --- /dev/null +++ b/libnmdb/libnmdb.c @@ -0,0 +1,344 @@ + +#include <sys/types.h> /* socket defines */ +#include <sys/socket.h> /* socket functions */ +#include <stdlib.h> /* malloc() */ +#include <linux/tipc.h> /* tipc stuff */ +#include <stdint.h> /* uint32_t and friends */ +#include <arpa/inet.h> /* htonls() and friends */ +#include <string.h> /* memcpy() */ +#include <unistd.h> /* close() */ + +#include <stdio.h> + +#include "nmdb.h" +#include "net-const.h" + +/* The ID code for requests is hardcoded for now, until asynchronous requests + * are implemented. */ +#define ID_CODE 1 + + +nmdb_t *nmdb_init(int port) +{ + int fd; + nmdb_t *db; + + db = malloc(sizeof(nmdb_t)); + if (db == NULL) { + return NULL; + } + + if (port < 0) + port = SERVER_INST; + + db->srvsa.family = AF_TIPC; + db->srvsa.addrtype = TIPC_ADDR_NAMESEQ; + db->srvsa.addr.nameseq.type = SERVER_TYPE; + db->srvsa.addr.nameseq.lower = port; + db->srvsa.addr.nameseq.upper = port; + db->srvsa.scope = TIPC_CLUSTER_SCOPE; + db->srvlen = (socklen_t) sizeof(db->srvsa); + + fd = socket(AF_TIPC, SOCK_RDM, 0); + if (fd < 0) { + free(db); + return NULL; + } + db->fd = fd; + + return db; +} + + +int nmdb_free(nmdb_t *db) +{ + close(db->fd); + free(db); + return 1; +} + + +static int srv_send(nmdb_t *db, const unsigned char *buf, size_t bsize) +{ + ssize_t rv; + rv = sendto(db->fd, buf, bsize, 0, (struct sockaddr *) &(db->srvsa), + db->srvlen); + if (rv <= 0) + return 0; + return 1; +} + +static ssize_t srv_recv(nmdb_t *db, unsigned char *buf, size_t bsize) +{ + ssize_t rv; + rv = recv(db->fd, buf, bsize, 0); + return rv; + +} + +static uint32_t get_rep(nmdb_t *db, unsigned char *buf, size_t bsize, + unsigned char **payload, size_t *psize) +{ + ssize_t t; + uint32_t id, reply; + + t = srv_recv(db, buf, bsize); + if (t < 4 + 4) { + return -1; + } + + id = * (uint32_t *) buf; + id = ntohl(id); + reply = * ((uint32_t *) buf + 1); + reply = ntohl(reply); + + if (id != ID_CODE) { + return -1; + } + + if (payload != NULL) { + *payload = buf + 4 + 4; + *psize = t - 4 - 4; + } + return reply; +} + + + +static ssize_t do_get(nmdb_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t vsize, int impact_db) +{ + ssize_t rv, t; + unsigned char *buf, *p; + size_t bsize, reqsize, psize; + uint32_t request, reply; + + if (impact_db) { + request = REQ_GET; + } else { + request = REQ_CACHE_GET; + } + + /* Use the same buffer for the request and the reply. + * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, + * ksize bytes key. + * Reply: 4 bytes id, 4 bytes reply code, 4 bytes vsize, + * vsize bytes key. + * + * We don't know vsize beforehand, but we do know TIPC's max packet is + * 66000. We malloc 128k just in case. + */ + bsize = 128 * 1024; + buf = malloc(bsize); + if (buf == NULL) + return -1; + + * (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE ); + * ((uint32_t *) buf + 1) = htonl(request); + * ((uint32_t *) buf + 2) = htonl(ksize); + p = buf + 3 * 4; + memcpy(p, key, ksize); + reqsize = 3 * 4 + ksize; + + t = srv_send(db, buf, reqsize); + if (t <= 0) { + rv = -1; + goto exit; + } + + reply = get_rep(db, buf, bsize, &p, &psize); + + if (reply == REP_CACHE_MISS || reply == REP_NOTIN) { + rv = 0; + goto exit; + } else if (reply == REP_ERR) { + rv = -1; + goto exit; + } else if (reply != REP_OK && reply != REP_CACHE_HIT) { + /* invalid response */ + rv = -1; + goto exit; + } + + /* we've got an answer (either REP_OK or REP_CACHE_HIT) */ + rv = * (uint32_t *) p; + rv = ntohl(rv); + if (rv > (psize - 4) || rv > vsize) { + rv = 0; + goto exit; + } + memcpy(val, p + 4, rv); + +exit: + free(buf); + return rv; +} + +ssize_t nmdb_get(nmdb_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t vsize) +{ + return do_get(db, key, ksize, val, vsize, 1); +} + +ssize_t nmdb_cache_get(nmdb_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t vsize) +{ + return do_get(db, key, ksize, val, vsize, 0); +} + + + +static int do_set(nmdb_t *db, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize, + int impact_db, int async) +{ + ssize_t rv, t; + unsigned char *buf, *p; + size_t bsize; + uint32_t request, reply; + + if (impact_db) { + if (async) + request = REQ_SET_ASYNC; + else + request = REQ_SET; + } else { + request = REQ_CACHE_SET; + } + + + /* Use the same buffer for the request and the reply. + * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, 4 + * bytes vsize, ksize bytes key, vsize bytes val. + * Reply: 4 bytes id, 4 bytes reply code. + */ + bsize = 4 + 4 + 4 + 4 + ksize + vsize; + buf = malloc(bsize); + if (buf == NULL) + return -1; + + * (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE ); + * ((uint32_t *) buf + 1) = htonl(request); + * ((uint32_t *) buf + 2) = htonl(ksize); + * ((uint32_t *) buf + 3) = htonl(vsize); + p = buf + 4 * 4; + memcpy(p, key, ksize); + p += ksize; + memcpy(p, val, vsize); + + t = srv_send(db, buf, bsize); + if (t <= 0) { + rv = -1; + goto exit; + } + + reply = get_rep(db, buf, bsize, NULL, NULL); + + if (reply == REP_OK) { + rv = 1; + goto exit; + } + + /* REP_ERR or invalid response */ + rv = -1; + +exit: + free(buf); + return rv; + +} + +int nmdb_set(nmdb_t *db, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize) +{ + return do_set(db, key, ksize, val, vsize, 1, 0); +} + +int nmdb_set_async(nmdb_t *db, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize) +{ + return do_set(db, key, ksize, val, vsize, 1, 1); +} + +int nmdb_cache_set(nmdb_t *db, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize) +{ + return do_set(db, key, ksize, val, vsize, 0, 0); +} + + + +int do_del(nmdb_t *db, const unsigned char *key, size_t ksize, + int impact_db, int async) +{ + ssize_t rv, t; + unsigned char *buf; + size_t bsize; + uint32_t request, reply; + + if (impact_db) { + if (async) + request = REQ_DEL_ASYNC; + else + request = REQ_DEL; + } else { + request = REQ_CACHE_DEL; + } + + + /* Use the same buffer for the request and the reply. + * Request: 4 bytes ver+id, 4 bytes request code, 4 bytes ksize, + * ksize bytes key. + * Reply: 4 bytes id, 4 bytes reply code. + */ + bsize = 8 + 4 + ksize; + buf = malloc(bsize); + if (buf == NULL) + return -1; + + * (uint32_t *) buf = htonl( (PROTO_VER << 28) | ID_CODE ); + * ((uint32_t *) buf + 1) = htonl(request); + * ((uint32_t *) buf + 2) = htonl(ksize); + memcpy(buf + 3 * 4, key, ksize); + + t = srv_send(db, buf, bsize); + if (t <= 0) { + rv = -1; + goto exit; + } + + reply = get_rep(db, buf, bsize, NULL, NULL); + + if (reply == REP_OK) { + rv = 1; + goto exit; + } else if (reply == REP_NOTIN) { + rv = 0; + goto exit; + } + + /* REP_ERR or invalid response */ + rv = -1; + +exit: + free(buf); + return rv; + +} + +int nmdb_del(nmdb_t *db, const unsigned char *key, size_t ksize) +{ + return do_del(db, key, ksize, 1, 0); +} + +int nmdb_del_async(nmdb_t *db, const unsigned char *key, size_t ksize) +{ + return do_del(db, key, ksize, 1, 1); +} + +int nmdb_cache_del(nmdb_t *db, const unsigned char *key, size_t ksize) +{ + return do_del(db, key, ksize, 0, 0); +} + + diff --git a/libnmdb/net-const.h b/libnmdb/net-const.h new file mode 100644 index 0000000..159de4b --- /dev/null +++ b/libnmdb/net-const.h @@ -0,0 +1,43 @@ + +#ifndef _NET_CONST_H +#define _NET_CONST_H + +/* + * Local network constants. + * Isolated so it's shared between the server and the library code. + */ + +/* TIPC server type and instance -- Hardcoded for now. */ +#define SERVER_TYPE 26001 +#define SERVER_INST 10 + +/* Protocol version, for checking in the network header. */ +#define PROTO_VER 1 + +/* Network requests */ +#define REQ_CACHE_GET 0x101 +#define REQ_CACHE_SET 0x102 +#define REQ_CACHE_DEL 0x103 +#define REQ_GET 0x104 +#define REQ_SET 0x105 +#define REQ_DEL 0x106 +#define REQ_SET_ASYNC 0x107 +#define REQ_DEL_ASYNC 0x108 + +/* Network replies (different namespace from requests) */ +#define REP_ERR 0x100 +#define REP_CACHE_HIT 0x101 +#define REP_CACHE_MISS 0x102 +#define REP_OK 0x103 +#define REP_NOTIN 0x104 + +/* Network error replies */ +#define ERR_VER 0x101 /* Version mismatch */ +#define ERR_SEND 0x102 /* Error sending data */ +#define ERR_BROKEN 0x103 /* Broken request */ +#define ERR_UNKREQ 0x104 /* Unknown request */ +#define ERR_MEM 0x105 /* Memory allocation error */ +#define ERR_DB 0x106 /* Database error */ + +#endif + diff --git a/libnmdb/nmdb.h b/libnmdb/nmdb.h new file mode 100644 index 0000000..e31a44b --- /dev/null +++ b/libnmdb/nmdb.h @@ -0,0 +1,35 @@ + +#ifndef _NMDB_H +#define _NMDB_H + +#include <sys/types.h> /* socket defines */ +#include <sys/socket.h> /* socklen_t */ +#include <linux/tipc.h> /* struct sockaddr_tipc */ + +typedef struct nmdb_t { + int fd; + struct sockaddr_tipc srvsa; + socklen_t srvlen; +} nmdb_t; + +nmdb_t *nmdb_init(int port); +int nmdb_free(nmdb_t *db); + +ssize_t nmdb_get(nmdb_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t vsize); +ssize_t nmdb_cache_get(nmdb_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t vsize); + +int nmdb_set(nmdb_t *db, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize); +int nmdb_set_async(nmdb_t *db, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize); +int nmdb_cache_set(nmdb_t *db, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize); + +int nmdb_del(nmdb_t *db, const unsigned char *key, size_t ksize); +int nmdb_del_async(nmdb_t *db, const unsigned char *key, size_t ksize); +int nmdb_cache_del(nmdb_t *db, const unsigned char *key, size_t ksize); + +#endif + diff --git a/libnmdb/test1c.c b/libnmdb/test1c.c new file mode 100644 index 0000000..8b356ac --- /dev/null +++ b/libnmdb/test1c.c @@ -0,0 +1,84 @@ + +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <sys/time.h> +#include <stdlib.h> + +#include "nmdb.h" +#include "timer.h" + + +int main(int argc, char **argv) +{ + int i, r, times; + unsigned char *key, *val; + size_t ksize, vsize; + unsigned long elapsed, misses = 0; + nmdb_t *db; + + if (argc != 4) { + printf("Usage: test1 TIMES KEY VAL\n"); + return 1; + } + + times = atoi(argv[1]); + key = (unsigned char *) argv[2]; + ksize = strlen((char *) key); + val = (unsigned char *) argv[3]; + vsize = strlen((char *) val); + + db = nmdb_init(-1); + if (db == NULL) { + perror("nmdb_init() failed"); + return 1; + } + + printf("set... "); + timer_start(); + for (i = 0; i < times; i++) { + r = nmdb_cache_set(db, key, ksize, val, vsize); + if (r < 0) { + perror("Set"); + return 1; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + + val = malloc(128 * 1024); + printf("get... "); + timer_start(); + for (i = 0; i < times; i++) { + r = nmdb_cache_get(db, key, ksize, val, vsize); + if (r < 0) { + perror("Get"); + return 1; + } else if (r == 0) { + misses++; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + free(val); + + printf("get misses: %ld\n", misses); + + printf("del... "); + timer_start(); + for (i = 0; i < times; i++) { + r = nmdb_cache_del(db, key, ksize); + if (r < 0) { + perror("Del"); + return 1; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + + + nmdb_free(db); + + return 0; +} + diff --git a/libnmdb/test1d.c b/libnmdb/test1d.c new file mode 100644 index 0000000..cdc526e --- /dev/null +++ b/libnmdb/test1d.c @@ -0,0 +1,84 @@ + +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <sys/time.h> +#include <stdlib.h> + +#include "nmdb.h" +#include "timer.h" + + +int main(int argc, char **argv) +{ + int i, r, times; + unsigned char *key, *val; + size_t ksize, vsize; + unsigned long elapsed, misses = 0; + nmdb_t *db; + + if (argc != 4) { + printf("Usage: test1 TIMES KEY VAL\n"); + return 1; + } + + times = atoi(argv[1]); + key = (unsigned char *) argv[2]; + ksize = strlen((char *) key); + val = (unsigned char *) argv[3]; + vsize = strlen((char *) val); + + db = nmdb_init(-1); + if (db == NULL) { + perror("nmdb_init() failed"); + return 1; + } + + printf("set... "); + timer_start(); + for (i = 0; i < times; i++) { + r = nmdb_set(db, key, ksize, val, vsize); + if (r < 0) { + perror("Set"); + return 1; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + + val = malloc(128 * 1024); + printf("get... "); + timer_start(); + for (i = 0; i < times; i++) { + r = nmdb_get(db, key, ksize, val, vsize); + if (r < 0) { + perror("Get"); + return 1; + } else if (r == 0) { + misses++; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + free(val); + + printf("get misses: %ld\n", misses); + + printf("del... "); + timer_start(); + for (i = 0; i < times; i++) { + r = nmdb_del(db, key, ksize); + if (r < 0) { + perror("Del"); + return 1; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + + + nmdb_free(db); + + return 0; +} + diff --git a/libnmdb/test2c.c b/libnmdb/test2c.c new file mode 100644 index 0000000..0d98824 --- /dev/null +++ b/libnmdb/test2c.c @@ -0,0 +1,92 @@ + +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <sys/time.h> +#include <stdlib.h> + +#include "nmdb.h" +#include "timer.h" + + +int main(int argc, char **argv) +{ + int i, r, times; + unsigned char *key, *val; + size_t ksize, vsize; + unsigned long elapsed, misses = 0; + nmdb_t *db; + + if (argc != 4) { + printf("Usage: test2 TIMES KSIZE VSIZE\n"); + return 1; + } + + times = atoi(argv[1]); + ksize = atoi(argv[2]); + vsize = atoi(argv[3]); + key = malloc(ksize); + memset(key, 0, ksize); + val = malloc(vsize); + memset(val, 0, vsize); + + db = nmdb_init(-1); + if (db == NULL) { + perror("nmdb_init() failed"); + return 1; + } + + printf("set... "); + timer_start(); + for (i = 0; i < times; i++) { + * (int *) key = i; + * (int *) val = i; + r = nmdb_cache_set(db, key, ksize, val, vsize); + if (r < 0) { + perror("Set"); + return 1; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + + memset(key, 0, ksize); + free(val); + val = malloc(128 * 1024); + printf("get... "); + timer_start(); + for (i = 0; i < times; i++) { + * (int *) key = i; + r = nmdb_cache_get(db, key, ksize, val, vsize); + if (r < 0) { + perror("Get"); + return 1; + } else if (r == 0) { + misses++; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + free(val); + + printf("get misses: %ld\n", misses); + + printf("del... "); + timer_start(); + for (i = 0; i < times; i++) { + * (int *) key = i; + r = nmdb_cache_del(db, key, ksize); + if (r < 0) { + perror("Del"); + return 1; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + + free(key); + nmdb_free(db); + + return 0; +} + diff --git a/libnmdb/test2d.c b/libnmdb/test2d.c new file mode 100644 index 0000000..8ee48b5 --- /dev/null +++ b/libnmdb/test2d.c @@ -0,0 +1,92 @@ + +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <sys/time.h> +#include <stdlib.h> + +#include "nmdb.h" +#include "timer.h" + + +int main(int argc, char **argv) +{ + int i, r, times; + unsigned char *key, *val; + size_t ksize, vsize; + unsigned long elapsed, misses = 0; + nmdb_t *db; + + if (argc != 4) { + printf("Usage: test2 TIMES KSIZE VSIZE\n"); + return 1; + } + + times = atoi(argv[1]); + ksize = atoi(argv[2]); + vsize = atoi(argv[3]); + key = malloc(ksize); + memset(key, 0, ksize); + val = malloc(vsize); + memset(val, 0, vsize); + + db = nmdb_init(-1); + if (db == NULL) { + perror("nmdb_init() failed"); + return 1; + } + + printf("set... "); + timer_start(); + for (i = 0; i < times; i++) { + * (int *) key = i; + * (int *) val = i; + r = nmdb_set(db, key, ksize, val, vsize); + if (r < 0) { + perror("Set"); + return 1; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + + memset(key, 0, ksize); + free(val); + val = malloc(128 * 1024); + printf("get... "); + timer_start(); + for (i = 0; i < times; i++) { + * (int *) key = i; + r = nmdb_get(db, key, ksize, val, vsize); + if (r < 0) { + perror("Get"); + return 1; + } else if (r == 0) { + misses++; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + free(val); + + printf("get misses: %ld\n", misses); + + printf("del... "); + timer_start(); + for (i = 0; i < times; i++) { + * (int *) key = i; + r = nmdb_del(db, key, ksize); + if (r < 0) { + perror("Del"); + return 1; + } + } + elapsed = timer_stop(); + printf("%lu\n", elapsed); + + free(key); + nmdb_free(db); + + return 0; +} + diff --git a/libnmdb/timer.h b/libnmdb/timer.h new file mode 100644 index 0000000..75315c5 --- /dev/null +++ b/libnmdb/timer.h @@ -0,0 +1,39 @@ + +/* + * A simple timer for measuring delays. + * Alberto Bertogli (albertito@gmail.com) - September/2006 + * + * Use it like this: + * unsigned long elapsed; + * ... + * timer_start(); + * ... [code] ... + * elapsed = timer_stop(); + * ... + * printf("Time elapsed: %ul", elapsed); + * + * Nested timers are not supported. The result is in usecs. + * + * 1000000 usecs == 1 sec + */ + + +#ifndef _TIMER_H +#define _TIMER_H + +#include <sys/time.h> + +static struct timeval tv_s, tv_e; + +static void timer_start() { + gettimeofday(&tv_s, NULL); +} + +static unsigned long timer_stop() { + gettimeofday(&tv_e, NULL); + return (tv_e.tv_sec - tv_s.tv_sec) * 1000000ul + + (tv_e.tv_usec - tv_s.tv_usec); +} + +#endif + diff --git a/nmdb/LICENSE b/nmdb/LICENSE new file mode 100644 index 0000000..0ed89f2 --- /dev/null +++ b/nmdb/LICENSE @@ -0,0 +1,184 @@ + +This project, "nmdb", is copyrighted by Alberto Bertogli and licensed under +the Open Software License version 3.0 as obtained from +http://www.opensource.org (and included here-in for easy reference) (that +license itself is copyrighted by Lawrence Rosen). + + Alberto Bertogli + 11/September/2006 + +----------------------------------------------------------------------------- + + +Open Software License (“OSL”) v. 3.0 + +This Open Software License (the "License") applies to any original work of +authorship (the "Original Work") whose owner (the "Licensor") has placed the +following licensing notice adjacent to the copyright notice for the Original +Work: + +Licensed under the Open Software License version 3.0 + +1) Grant of Copyright License. Licensor grants You a worldwide, royalty-free, +non-exclusive, sublicensable license, for the duration of the copyright, to do +the following: + +a) to reproduce the Original Work in copies, either alone or as part of a +collective work; + +b) to translate, adapt, alter, transform, modify, or arrange the Original +Work, thereby creating derivative works ("Derivative Works") based upon the +Original Work; + +c) to distribute or communicate copies of the Original Work and Derivative +Works to the public, with the proviso that copies of Original Work or +Derivative Works that You distribute or communicate shall be licensed under +this Open Software License; + +d) to perform the Original Work publicly; and + +e) to display the Original Work publicly. + +2) Grant of Patent License. Licensor grants You a worldwide, royalty-free, +non-exclusive, sublicensable license, under patent claims owned or controlled +by the Licensor that are embodied in the Original Work as furnished by the +Licensor, for the duration of the patents, to make, use, sell, offer for sale, +have made, and import the Original Work and Derivative Works. + +3) Grant of Source Code License. The term "Source Code" means the preferred +form of the Original Work for making modifications to it and all available +documentation describing how to modify the Original Work. Licensor agrees to +provide a machine-readable copy of the Source Code of the Original Work along +with each copy of the Original Work that Licensor distributes. Licensor +reserves the right to satisfy this obligation by placing a machine-readable +copy of the Source Code in an information repository reasonably calculated to +permit inexpensive and convenient access by You for as long as Licensor +continues to distribute the Original Work. + +4) Exclusions From License Grant. Neither the names of Licensor, nor the names +of any contributors to the Original Work, nor any of their trademarks or +service marks, may be used to endorse or promote products derived from this +Original Work without express prior permission of the Licensor. Except as +expressly stated herein, nothing in this License grants any license to +Licensor’s trademarks, copyrights, patents, trade secrets or any other +intellectual property. No patent license is granted to make, use, sell, offer +for sale, have made, or import embodiments of any patent claims other than the +licensed claims defined in Section 2. No license is granted to the trademarks +of Licensor even if such marks are included in the Original Work. Nothing in +this License shall be interpreted to prohibit Licensor from licensing under +terms different from this License any Original Work that Licensor otherwise +would have a right to license. + +5) External Deployment. The term "External Deployment" means the use, +distribution, or communication of the Original Work or Derivative Works in any +way such that the Original Work or Derivative Works may be used by anyone +other than You, whether those works are distributed or communicated to those +persons or made available as an application intended for use over a network. +As an express condition for the grants of license hereunder, You must treat +any External Deployment by You of the Original Work or a Derivative Work as a +distribution under section 1(c). + +6) Attribution Rights. You must retain, in the Source Code of any Derivative +Works that You create, all copyright, patent, or trademark notices from the +Source Code of the Original Work, as well as any notices of licensing and any +descriptive text identified therein as an "Attribution Notice." You must cause +the Source Code for any Derivative Works that You create to carry a prominent +Attribution Notice reasonably calculated to inform recipients that You have +modified the Original Work. + +7) Warranty of Provenance and Disclaimer of Warranty. Licensor warrants that +the copyright in and to the Original Work and the patent rights granted herein +by Licensor are owned by the Licensor or are sublicensed to You under the +terms of this License with the permission of the contributor(s) of those +copyrights and patent rights. Except as expressly stated in the immediately +preceding sentence, the Original Work is provided under this License on an "AS +IS" BASIS and WITHOUT WARRANTY, either express or implied, including, without +limitation, the warranties of non-infringement, merchantability or fitness for +a particular purpose. THE ENTIRE RISK AS TO THE QUALITY OF THE ORIGINAL WORK +IS WITH YOU. This DISCLAIMER OF WARRANTY constitutes an essential part of this +License. No license to the Original Work is granted by this License except +under this disclaimer. + +8) Limitation of Liability. Under no circumstances and under no legal theory, +whether in tort (including negligence), contract, or otherwise, shall the +Licensor be liable to anyone for any indirect, special, incidental, or +consequential damages of any character arising as a result of this License or +the use of the Original Work including, without limitation, damages for loss +of goodwill, work stoppage, computer failure or malfunction, or any and all +other commercial damages or losses. This limitation of liability shall not +apply to the extent applicable law prohibits such limitation. + +9) Acceptance and Termination. If, at any time, You expressly assented to this +License, that assent indicates your clear and irrevocable acceptance of this +License and all of its terms and conditions. If You distribute or communicate +copies of the Original Work or a Derivative Work, You must make a reasonable +effort under the circumstances to obtain the express assent of recipients to +the terms of this License. This License conditions your rights to undertake +the activities listed in Section 1, including your right to create Derivative +Works based upon the Original Work, and doing so without honoring these terms +and conditions is prohibited by copyright law and international treaty. +Nothing in this License is intended to affect copyright exceptions and +limitations (including “fair use” or “fair dealing”). This License shall +terminate immediately and You may no longer exercise any of the rights granted +to You by this License upon your failure to honor the conditions in Section +1(c). + +10) Termination for Patent Action. This License shall terminate automatically +and You may no longer exercise any of the rights granted to You by this +License as of the date You commence an action, including a cross-claim or +counterclaim, against Licensor or any licensee alleging that the Original Work +infringes a patent. This termination provision shall not apply for an action +alleging patent infringement by combinations of the Original Work with other +software or hardware. + +11) Jurisdiction, Venue and Governing Law. Any action or suit relating to this +License may be brought only in the courts of a jurisdiction wherein the +Licensor resides or in which Licensor conducts its primary business, and under +the laws of that jurisdiction excluding its conflict-of-law provisions. The +application of the United Nations Convention on Contracts for the +International Sale of Goods is expressly excluded. Any use of the Original +Work outside the scope of this License or after its termination shall be +subject to the requirements and penalties of copyright or patent law in the +appropriate jurisdiction. This section shall survive the termination of this +License. + +12) Attorneys’ Fees. In any action to enforce the terms of this License or +seeking damages relating thereto, the prevailing party shall be entitled to +recover its costs and expenses, including, without limitation, reasonable +attorneys' fees and costs incurred in connection with such action, including +any appeal of such action. This section shall survive the termination of this +License. + +13) Miscellaneous. If any provision of this License is held to be +unenforceable, such provision shall be reformed only to the extent necessary +to make it enforceable. + +14) Definition of "You" in This License. "You" throughout this License, +whether in upper or lower case, means an individual or a legal entity +exercising rights under, and complying with all of the terms of, this License. +For legal entities, "You" includes any entity that controls, is controlled by, +or is under common control with you. For purposes of this definition, +"control" means (i) the power, direct or indirect, to cause the direction or +management of such entity, whether by contract or otherwise, or (ii) ownership +of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial +ownership of such entity. + +15) Right to Use. You may use the Original Work in all ways not otherwise +restricted or conditioned by this License or by law, and Licensor promises not +to interfere with or be responsible for such uses by You. + +16) Modification of This License. This License is Copyright © 2005 Lawrence +Rosen. Permission is granted to copy, distribute, or communicate this License +without modification. Nothing in this License permits You to modify this +License as applied to the Original Work or to Derivative Works. However, You +may modify the text of this License and copy, distribute or communicate your +modified version (the "Modified License") and apply it to other original works +of authorship subject to the following conditions: (i) You may not indicate in +any way that your Modified License is the "Open Software License" or "OSL" and +you may not use those names in the name of your Modified License; (ii) You +must replace the notice specified in the first paragraph above with the notice +"Licensed under <insert your license name here>" or with a notice of your own +that is not confusingly similar to the notice in this License; and (iii) You +may not claim that your original works are open source software unless your +Modified License has been approved by Open Source Initiative (OSI) and You +comply with its license review and certification process. diff --git a/nmdb/Makefile b/nmdb/Makefile new file mode 100644 index 0000000..315609c --- /dev/null +++ b/nmdb/Makefile @@ -0,0 +1,40 @@ + +CFLAGS += -std=c99 -Wall -D_XOPEN_SOURCE=500 -O3 + +ifdef DEBUG +CFLAGS += -g -pg -fprofile-arcs -ftest-coverage +endif + +ifdef STRICT +CFLAGS += -ansi -pedantic +endif + +# prefix for installing the binaries +PREFIX=/usr/local + + +OBJS = be-qdbm.o cache.o db.o queue.o net.o tipc.o main.o + +default: all + +all: nmdb + +nmdb: $(OBJS) + $(CC) $(CFLAGS) $(OBJS) -levent -lpthread -lqdbm -o nmdb + +.c.o: + $(CC) $(CFLAGS) -c $< -o $@ + +install: all + install -d $(PREFIX)/bin + install -m 0755 nmdb $(PREFIX)/bin + install -d $(PREFIX)/man/man1 + install -m 0644 nmdb.1 $(PREFIX)/man/man1/ + +clean: + rm -f $(OBJS) nmdb + rm -f *.bb *.bbg *.da *.gcov gmon.out + +.PHONY: default all clean + + diff --git a/nmdb/be-qdbm.c b/nmdb/be-qdbm.c new file mode 100644 index 0000000..d247d1d --- /dev/null +++ b/nmdb/be-qdbm.c @@ -0,0 +1,48 @@ + +#include <depot.h> /* QDBM's Depot API */ +#include <stdlib.h> + +#include "be.h" + + +db_t *db_open(const char *name, int flags) +{ + int f; + + f = DP_OREADER | DP_OWRITER | DP_ONOLCK; + return dpopen(name, f, 0); +} + + +int db_close(db_t *db) +{ + return dpclose(db); +} + + +int db_set(db_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t vsize) +{ + return dpput(db, (char *) key, ksize, (char *) val, vsize, DP_DOVER); +} + + +int db_get(db_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t *vsize) +{ + int rv; + + rv = dpgetwb(db, (char *) key, ksize, 0, *vsize, (char *) val); + if (rv >= 0) { + *vsize = rv; + return 1; + } else { + return 0; + } +} + +int db_del(db_t *db, const unsigned char *key, size_t ksize) +{ + return dpout(db, (char *) key, ksize); +} + diff --git a/nmdb/be.h b/nmdb/be.h new file mode 100644 index 0000000..246f3de --- /dev/null +++ b/nmdb/be.h @@ -0,0 +1,21 @@ + +#ifndef _BE_H +#define _BE_H + +/* The following should be specific to the db backend we use. As we only + * handle qdbm for now, there's no need to play with #ifdefs. */ +#include <depot.h> +typedef DEPOT db_t; + + +db_t *db_open(const char *name, int flags); +int db_close(db_t *db); +int db_set(db_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t vsize); +int db_get(db_t *db, const unsigned char *key, size_t ksize, + unsigned char *val, size_t *vsize); +int db_del(db_t *db, const unsigned char *key, size_t ksize); + +#endif + + diff --git a/nmdb/cache.c b/nmdb/cache.c new file mode 100644 index 0000000..a5a9806 --- /dev/null +++ b/nmdb/cache.c @@ -0,0 +1,308 @@ + +/* Generic cache layer. + * It's a hash table with cache-style properties, keeping a (non-precise) size + * and using a natural, per-chain LRU to do cleanups. + * Cleanups are performed in place, when cache_set() gets called. + */ + +#include <sys/types.h> /* for size_t */ +#include <stdint.h> /* for uint32_t */ +#include <stdlib.h> /* for malloc() */ +#include <string.h> /* for memcpy()/memcmp() */ +#include "cache.h" + +struct cache *cache_create(size_t numobjs, unsigned int flags) +{ + size_t i; + struct cache *cd; + struct cache_chain *c; + + cd = (struct cache *) malloc(sizeof(struct cache)); + if (cd == NULL) + return NULL; + + cd->flags = flags; + + /* We calculate the hash size so we have 4 objects per bucket; 4 being + * an arbitrary number. It's long enough to make LRU useful, and small + * enough to make lookups fast. */ + cd->chainlen = 4; + cd->hashlen = numobjs / cd->chainlen; + + cd->table = (struct cache_chain *) + malloc(sizeof(struct cache_chain) * cd->hashlen); + if (cd->table == NULL) { + free(cd); + return NULL; + } + + for (i = 0; i < cd->hashlen; i++) { + c = cd->table + i; + c->len = 0; + c->first = NULL; + c->last = NULL; + } + + return cd; +} + + +int cache_free(struct cache *cd) +{ + size_t i; + struct cache_chain *c; + struct cache_entry *e, *n; + + for (i = 0; i < cd->hashlen; i++) { + c = cd->table + i; + if (c->first == NULL) + continue; + + e = c->first; + while (e != NULL) { + n = e->next; + free(e->key); + free(e->val); + free(e); + e = n; + } + } + + free(cd->table); + free(cd); + return 1; +} + + +/* + * The hash function used is the "One at a time" function, which seems simple, + * fast and popular. Others for future consideration if speed becomes an issue + * include: + * * FNV Hash (http://www.isthe.com/chongo/tech/comp/fnv/) + * * SuperFastHash (http://www.azillionmonkeys.com/qed/hash.html) + * * Judy dynamic arrays (http://judy.sf.net) + * + * A good comparison can be found at + * http://eternallyconfuzzled.com/tuts/hashing.html#existing + */ + +static uint32_t hash(const unsigned char *key, const size_t ksize) +{ + uint32_t h = 0; + size_t i; + + for (i = 0; i < ksize; i++) { + h += key[i]; + h += (h << 10); + h ^= (h >> 6); + } + h += (h << 3); + h ^= (h >> 11); + h += (h << 15); + return h; +} + + +/* + * Looks given key up in the chain. + * Returns NULL if not found, or a pointer to the cache entry if it's found. + * The chain can be empty. + * Used in cache_get() and cache_set(). + */ +static struct cache_entry *find_in_chain(struct cache_chain *c, + const unsigned char *key, size_t ksize) +{ + int found = 0; + struct cache_entry *e; + + e = c->first; + while (e != NULL) { + if (ksize != e->ksize) { + e = e->next; + continue; + } + if (memcmp(key, e->key, ksize) == 0) { + found = 1; + break; + } + + e = e->next; + } + + if (found) + return e; + return NULL; + +} + +/* + * Gets the matching value for the given key. + * Returns 0 if no match was found, or 1 otherwise. + */ +int cache_get(struct cache *cd, const unsigned char *key, size_t ksize, + unsigned char **val, size_t *vsize) +{ + uint32_t h = 0; + struct cache_chain *c; + struct cache_entry *e; + + h = hash(key, ksize) % cd->hashlen; + c = cd->table + h; + + e = find_in_chain(c, key, ksize); + + if (e == NULL) { + *val = NULL; + *vsize = 0; + return 0; + } + + *val = e->val; + *vsize = e->vsize; + + return 1; +} + + +int cache_set(struct cache *cd, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize) +{ + int rv = 1; + uint32_t h = 0; + struct cache_chain *c; + struct cache_entry *e, *new; + unsigned char *v; + + h = hash(key, ksize) % cd->hashlen; + c = cd->table + h; + + e = find_in_chain(c, key, ksize); + + if (e == NULL) { + /* not found, create a new cache entry */ + new = malloc(sizeof(struct cache_entry)); + if (new == NULL) { + rv = 0; + goto exit; + } + + new->ksize = ksize; + new->vsize = vsize; + + new->key = malloc(ksize); + if (new->key == NULL) { + free(new); + rv = 0; + goto exit; + } + memcpy(new->key, key, ksize); + + new->val = malloc(vsize); + if (new->val == NULL) { + free(new->key); + free(new); + rv = 0; + goto exit; + } + memcpy(new->val, val, vsize); + new->prev = NULL; + new->next = NULL; + + /* and put it in */ + if (c->len == 0) { + /* line is empty, just put it there */ + c->first = new; + c->last = new; + c->len = 1; + } else if (c->len <= cd->chainlen) { + /* slots are still available, put the entry first */ + new->next = c->first; + c->first->prev = new; + c->first = new; + c->len += 1; + } else { + /* chain is full, we need to evict the last one */ + e = c->last; + c->last = e->prev; + c->last->next = NULL; + free(e->key); + free(e->val); + free(e); + + new->next = c->first; + c->first->prev = new; + c->first = new; + } + } else { + /* we've got a match, just replace the value in place */ + v = malloc(vsize); + if (v == NULL) { + rv = 0; + goto exit; + } + free(e->val); + e->val = v; + memcpy(e->val, val, vsize); + + /* promote the entry to the top of the list if necessary */ + if (c->first != e) { + if (c->last == e) + c->last = e->prev; + + e->prev->next = e->next; + if (e->next != NULL) + e->next->prev = e->prev; + e->prev = NULL; + e->next = c->first; + c->first->prev = e; + c->first = e; + } + } + +exit: + return rv; +} + + +int cache_del(struct cache *cd, const unsigned char *key, size_t ksize) +{ + + int rv = 1; + uint32_t h = 0; + struct cache_chain *c; + struct cache_entry *e; + + h = hash(key, ksize) % cd->hashlen; + c = cd->table + h; + + e = find_in_chain(c, key, ksize); + + if (e == NULL) { + rv = 0; + goto exit; + } + + if (c->first == e) { + c->first = e->next; + if (e->next != NULL) + e->next->prev = NULL; + } else { + e->prev->next = e->next; + if (e->next != NULL) + e->next->prev = e->prev; + } + + if (c->last == e) { + c->last = e->prev; + } + + free(e->key); + free(e->val); + free(e); + + c->len -= 1; + +exit: + return rv; +} + diff --git a/nmdb/cache.h b/nmdb/cache.h new file mode 100644 index 0000000..fd31900 --- /dev/null +++ b/nmdb/cache.h @@ -0,0 +1,49 @@ + +#ifndef _CACHE_H +#define _CACHE_H + +/* Generic cache layer. See cache.c for more information. */ + +#include <sys/types.h> /* for size_t */ + + +struct cache { + /* set directly by initialization */ + size_t numobjs; + unsigned int flags; + + /* calculated */ + size_t hashlen; + size_t chainlen; + + /* the cache data itself */ + struct cache_chain *table; +}; + +struct cache_chain { + size_t len; + struct cache_entry *first; + struct cache_entry *last; +}; + +struct cache_entry { + unsigned char *key; + unsigned char *val; + size_t ksize; + size_t vsize; + + struct cache_entry *prev; + struct cache_entry *next; +}; + + +struct cache *cache_create(size_t numobjs, unsigned int flags); +int cache_free(struct cache *cd); +int cache_get(struct cache *cd, const unsigned char *key, size_t ksize, + unsigned char **val, size_t *vsize); +int cache_set(struct cache *cd, const unsigned char *k, size_t ksize, + const unsigned char *v, size_t vsize); +int cache_del(struct cache *cd, const unsigned char *key, size_t ksize); + +#endif + diff --git a/nmdb/common.h b/nmdb/common.h new file mode 100644 index 0000000..2fb1185 --- /dev/null +++ b/nmdb/common.h @@ -0,0 +1,31 @@ + +/* Global data used throughout the whole application. */ + +#ifndef _COMMON_H +#define _COMMON_H + +/* The cache table */ +#include "cache.h" +struct cache *cache_table; + +/* The queue for database operations */ +#include "queue.h" +struct queue *op_queue; + +/* Settings */ +struct { + int tipc_lower; + int tipc_higher; + int numobjs; + int foreground; + char *dbname; +} settings; + +/* Statistics */ +struct { + unsigned long net_version_mismatch; + unsigned long net_broken_req; + unsigned long net_unk_req; +} stats; +#endif + diff --git a/nmdb/db.c b/nmdb/db.c new file mode 100644 index 0000000..d1b7a68 --- /dev/null +++ b/nmdb/db.c @@ -0,0 +1,125 @@ + +#include <pthread.h> /* threading functions */ +#include <time.h> /* nanosleep() */ + +#include <stdio.h> + +#include "common.h" +#include "db.h" +#include "be.h" +#include "queue.h" +#include "net-const.h" + + +static void *db_loop(void *arg); +static void process_op(db_t *db, struct queue_entry *e); + + +/* Used to signal the loop that it should exit when the queue becomes empty. + * It's not the cleanest way, but it's simple and effective. */ +static int loop_should_stop = 0; + + +pthread_t *db_loop_start(db_t *db) +{ + pthread_t *thread; + + thread = malloc(sizeof(pthread_t)); + if (thread == NULL) + return NULL; + + pthread_create(thread, NULL, db_loop, (void *) db); + + return thread; +} + +void db_loop_stop(pthread_t *thread) +{ + loop_should_stop = 1; + pthread_join(*thread, NULL); + free(thread); + return; +} + + +static void *db_loop(void *arg) +{ + struct queue_entry *e; + struct timespec ts; + db_t *db; + + db = (db_t *) arg; + + /* We will sleep this amount of time when the queue is empty. It's + * hardcoded, but needs testing. Currenly 0.2s. */ + ts.tv_sec = 0; + ts.tv_nsec = 1000000 / 5; + + for (;;) { + e = queue_get(op_queue); + if (e == NULL) { + if (loop_should_stop) { + break; + } else { + nanosleep(&ts, NULL); + continue; + } + } + process_op(db, e); + + /* Free the entry that was allocated when tipc queued the + * operation. This also frees it's components. */ + queue_entry_free(e); + } + + return NULL; +} + +static void process_op(db_t *db, struct queue_entry *e) +{ + int rv; + if (e->operation == REQ_SET) { + rv = db_set(db, e->key, e->ksize, e->val, e->vsize); + if (!rv) { + tipc_reply_err(e->req, ERR_DB); + return; + } + tipc_reply_set(e->req, REP_OK); + + } else if (e->operation == REQ_SET_ASYNC) { + db_set(db, e->key, e->ksize, e->val, e->vsize); + + } else if (e->operation == REQ_GET) { + unsigned char *val; + size_t vsize = 128 * 1024; + + val = malloc(vsize); + if (val == NULL) { + tipc_reply_err(e->req, ERR_MEM); + return; + } + rv = db_get(db, e->key, e->ksize, val, &vsize); + if (rv == 0) { + tipc_reply_get(e->req, REP_NOTIN, NULL, 0); + free(val); + return; + } + tipc_reply_get(e->req, REP_OK, val, vsize); + free(val); + + } else if (e->operation == REQ_DEL) { + rv = db_del(db, e->key, e->ksize); + if (rv == 0) { + tipc_reply_del(e->req, REP_NOTIN); + return; + } + tipc_reply_del(e->req, REP_OK); + + } else if (e->operation == REQ_DEL_ASYNC) { + db_del(db, e->key, e->ksize); + + } else { + printf("Unknown op 0x%x\n", e->operation); + } +} + diff --git a/nmdb/db.h b/nmdb/db.h new file mode 100644 index 0000000..c764d6a --- /dev/null +++ b/nmdb/db.h @@ -0,0 +1,12 @@ + +#ifndef _DB_H +#define _DB_H + +#include <pthread.h> /* for pthread_t */ +#include "be.h" /* for db_t */ + +pthread_t *db_loop_start(db_t *db); +void db_loop_stop(pthread_t *thread); + +#endif + diff --git a/nmdb/main.c b/nmdb/main.c new file mode 100644 index 0000000..5bb30c5 --- /dev/null +++ b/nmdb/main.c @@ -0,0 +1,155 @@ + +#include <stdio.h> /* printf() */ +#include <unistd.h> /* malloc(), fork() and getopt() */ +#include <stdlib.h> /* atoi() */ +#include <sys/types.h> /* for pid_t */ +#include <string.h> /* for strcpy() and strlen() */ +#include <pthread.h> /* for pthread_t */ + +#include "cache.h" +#include "net.h" +#include "db.h" +#include "common.h" +#include "net-const.h" + +#define DEFDBNAME "database" + + +static void help() { + char h[] = \ + "nmdb [options]\n" + " -d dbpath database path ('database', must be created with dpmgr)\n" + " -l lower lower TIPC port number (10)\n" + " -L upper upper TIPC port number (= lower)\n" + " -c nobj max. number of objects to be cached, in thousands (128)\n" + " -f don't fork and stay in the foreground\n" + " -h show this help\n" + "\n" + "Please report bugs to Alberto Bertogli (albertito@gmail.com)\n" + "\n"; + printf("%s", h); +} + + +static int load_settings(int argc, char **argv) +{ + int c; + + settings.tipc_lower = -1; + settings.tipc_higher = -1; + settings.numobjs = -1; + settings.foreground = 0; + + settings.dbname = malloc(strlen(DEFDBNAME) + 1); + strcpy(settings.dbname, DEFDBNAME); + + while ((c = getopt(argc, argv, "d:l:L:c:fh?")) != -1) { + switch(c) { + case 'd': + settings.dbname = malloc(strlen(optarg) + 1); + strcpy(settings.dbname, optarg); + break; + case 'l': + settings.tipc_lower = atoi(optarg); + break; + case 'L': + settings.tipc_higher = atoi(optarg); + break; + case 'c': + settings.numobjs = atoi(optarg) * 1024; + break; + case 'f': + settings.foreground = 1; + break; + case 'h': + case '?': + help(); + return 0; + default: + printf("Unknown parameter '%c'\n", c); + return 0; + } + } + + if (settings.tipc_lower == -1) + settings.tipc_lower = SERVER_INST; + if (settings.tipc_higher == -1) + settings.tipc_higher = settings.tipc_lower; + if (settings.numobjs == -1) + settings.numobjs = 128 * 1024; + + return 1; +} + + +static void init_stats(void) +{ + stats.net_version_mismatch = 0; + stats.net_broken_req = 0; + stats.net_unk_req = 0; + return; +} + + +int main(int argc, char **argv) +{ + struct cache *cd; + struct queue *q; + db_t *db; + pid_t pid; + pthread_t *dbthread; + + if (!load_settings(argc, argv)) + return 1; + + init_stats(); + + cd = cache_create(settings.numobjs, 0); + if (cd == NULL) { + perror("Error creating cache"); + return 1; + } + cache_table = cd; + + q = queue_create(); + if (q == NULL) { + perror("Error creating queue"); + return 1; + } + op_queue = q; + + db = db_open(settings.dbname, 0); + if (db == NULL) { + perror("Error opening DB"); + return 1; + } + + if (!settings.foreground) { + pid = fork(); + if (pid > 0) { + /* parent exits */ + return 0; + } else if (pid < 0) { + perror("Error in fork()"); + return 1; + } + + close(0); + setsid(); + } + + dbthread = db_loop_start(db); + + net_loop(); + + db_loop_stop(dbthread); + + db_close(db); + + queue_free(q); + + cache_free(cd); + + return 0; +} + diff --git a/nmdb/net-const.h b/nmdb/net-const.h new file mode 100644 index 0000000..159de4b --- /dev/null +++ b/nmdb/net-const.h @@ -0,0 +1,43 @@ + +#ifndef _NET_CONST_H +#define _NET_CONST_H + +/* + * Local network constants. + * Isolated so it's shared between the server and the library code. + */ + +/* TIPC server type and instance -- Hardcoded for now. */ +#define SERVER_TYPE 26001 +#define SERVER_INST 10 + +/* Protocol version, for checking in the network header. */ +#define PROTO_VER 1 + +/* Network requests */ +#define REQ_CACHE_GET 0x101 +#define REQ_CACHE_SET 0x102 +#define REQ_CACHE_DEL 0x103 +#define REQ_GET 0x104 +#define REQ_SET 0x105 +#define REQ_DEL 0x106 +#define REQ_SET_ASYNC 0x107 +#define REQ_DEL_ASYNC 0x108 + +/* Network replies (different namespace from requests) */ +#define REP_ERR 0x100 +#define REP_CACHE_HIT 0x101 +#define REP_CACHE_MISS 0x102 +#define REP_OK 0x103 +#define REP_NOTIN 0x104 + +/* Network error replies */ +#define ERR_VER 0x101 /* Version mismatch */ +#define ERR_SEND 0x102 /* Error sending data */ +#define ERR_BROKEN 0x103 /* Broken request */ +#define ERR_UNKREQ 0x104 /* Unknown request */ +#define ERR_MEM 0x105 /* Memory allocation error */ +#define ERR_DB 0x106 /* Database error */ + +#endif + diff --git a/nmdb/net.c b/nmdb/net.c new file mode 100644 index 0000000..6260460 --- /dev/null +++ b/nmdb/net.c @@ -0,0 +1,47 @@ + +#include <signal.h> /* signal constants */ +#include <stdio.h> /* perror() */ +#include <stdlib.h> /* exit() */ + +/* Workaround for libevent 1.1a: the header assumes u_char is typedef'ed to an + * unsigned char, and that "struct timeval" is in scope. */ +typedef unsigned char u_char; +#include <sys/time.h> +#include <event.h> + +#include "tipc.h" + + +static void signal_handler(int fd, short event, void *arg) +{ + printf("Got signal! Puf!\n"); + event_loopexit(NULL); +} + + +void net_loop(void) +{ + int tipc_fd; + struct event srv_evt, sigterm_evt, sigint_evt; + + tipc_fd = tipc_init(); + if (tipc_fd < 0) { + perror("Error initializing TIPC"); + exit(1); + } + + event_init(); + + event_set(&srv_evt, tipc_fd, EV_READ | EV_PERSIST, tipc_recv, + &srv_evt); + event_add(&srv_evt, NULL); + + signal_set(&sigterm_evt, SIGTERM, signal_handler, &sigterm_evt); + signal_add(&sigterm_evt, NULL); + signal_set(&sigint_evt, SIGINT, signal_handler, &sigint_evt); + signal_add(&sigint_evt, NULL); + + event_dispatch(); +} + + diff --git a/nmdb/net.h b/nmdb/net.h new file mode 100644 index 0000000..9dbf80a --- /dev/null +++ b/nmdb/net.h @@ -0,0 +1,8 @@ + +#ifndef _NET_H +#define _NET_H + +void net_loop(void); + +#endif + diff --git a/nmdb/nmdb.1 b/nmdb/nmdb.1 new file mode 100644 index 0000000..95eded3 --- /dev/null +++ b/nmdb/nmdb.1 @@ -0,0 +1,86 @@ +.TH nmdb 1 "11/Sep/2006" +.SH NAME +nmdb - A TIPC-based database manager +.SH SYNOPSYS +nmdb [-d dbpath] [-l lower] [-L upper] [-c nobj] [-f] [-h] +.SH DESCRIPTION + +nmdb is a TIPC-based database manager. + +It allows all the applications in the cluster to store (key, value) pairs in a +central cache and database in a transparent way. + +It can also be used as a generic caching system (pretty much like memcached), +because it has a very fast cache that can be used without impacting on the +database. + +Before using it, you need to create the backing database. As nmdb uses +.BR qdbm (3) +as a backend, you need to do this using the +.I dpmgr +command. See +.B "INVOCATION EXAMPLE" +below for more important information. + +The database is accessed with the +.BR libnmdb (3) +library. Consult its manual page for programming information. Python bindings +are also available. + +For additional documentation, go to the project's website at +.IR http://auriga.wearlab.de/~alb/nmdb . + +.SH OPTIONS +.TP +.B "-d dbpath" +Indicate the path to the database file to use. It must have been created with +.RB ' "dpmgr create" +.IR "dbpath" ' +and have read and write permissions. If a name is not provided, "database" +will be used. +.TP +.B "-l lower" +Lower TIPC port number to bind to. Defaults to 10. It's useful if you want to +run more than one nmdb instance in the same TIPC cluster. +.TP +.B "-L upper" +Upper TIPC port number to bind to. Defaults to the same value +.B lower +is defined. Useful mainly for passive mode (which is not implemented yet). +.TP +.B "-c nobj" +Sets the maximum number of objects the cache will held, in thousands. Note +that the size of the memory used by the cache layer depends on the size of the +object exclusively. It defaults to 128, so the default cache size has space to +hold 128 thousand objects. +.TP +.B "-f" +Stay in the foreground, don't fork. Useful for debugging. The default is to +fork. +.TP +.B "-h" +Show a brief help. + +.SH INVOCATION EXAMPLE +To create the database: +.B "dpmgr create /var/lib/nmpc-db" + +To run the server with the said database: +.B "nmpc -d /var/lib/nmpc-db" + +Be +.I very +careful not to start more than one nmdb instance on the same port at the same +time, even if it's on different machines. TIPC allows you to bind the same +port many times (and it's a very good feature), and all of them will get the +messages, which will result in several answers, which will confuse the +clients. This behaviour is different from the normal IP networking, where you +can't bind a port twice. + +.SH SEE ALSO +.BR libnmdb (3), +.B TIPC +(http://tipc.sf.net), +.BR qdbm (3). +.SH AUTHORS +Created by Alberto Bertogli (albertito@gmail.com). diff --git a/nmdb/queue.c b/nmdb/queue.c new file mode 100644 index 0000000..3badf82 --- /dev/null +++ b/nmdb/queue.c @@ -0,0 +1,119 @@ + +#include <stdlib.h> /* for malloc() */ +#include <pthread.h> /* for mutexes */ + +#include "queue.h" + + +struct queue *queue_create() +{ + struct queue *q; + pthread_mutexattr_t attr; + + q = malloc(sizeof(struct queue)); + if (q == NULL) + return NULL; + + q->size = 0; + q->top = NULL; + q->bottom = NULL; + + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); + pthread_mutex_init(&(q->lock), &attr); + pthread_mutexattr_destroy(&attr); + + return q; +} + + +void queue_free(struct queue *q) +{ + struct queue_entry *e; + + e = queue_get(q); + while (e != NULL) { + queue_entry_free(e); + e = queue_get(q); + } + + pthread_mutex_destroy(&(q->lock)); + + free(q); + return; +} + + +#define queue_lock(q) do { pthread_mutex_lock(&((q)->lock)); } while (0) +#define queue_unlock(q) do { pthread_mutex_unlock(&((q)->lock)); } while (0) + + +struct queue_entry *queue_entry_create() +{ + struct queue_entry *e; + + e = malloc(sizeof(struct queue_entry)); + if (e == NULL) + return NULL; + + e->operation = 0; + e->key = NULL; + e->val = NULL; + e->ksize = 0; + e->vsize = 0; + e->prev = NULL; + + return e; +} + +void queue_entry_free(struct queue_entry *e) { + if (e->req) { + free(e->req->clisa); + free(e->req); + } + if (e->key) + free(e->key); + if (e->val) + free(e->val); + free(e); + return; +} + + +void queue_put(struct queue *q, struct queue_entry *e) +{ + queue_lock(q); + if (q->top == NULL) { + q->top = q->bottom = e; + } else { + q->top->prev = e; + q->top = e; + } + q->size += 1; + queue_unlock(q); + return; +} + + +struct queue_entry *queue_get(struct queue *q) +{ + struct queue_entry *e, *t; + + queue_lock(q); + if (q->bottom == NULL) { + e = NULL; + } else { + e = q->bottom; + t = q->bottom->prev; + q->bottom = t; + if (t == NULL) { + /* it's empty now */ + q->top = NULL; + } + } + q->size -= 1; + queue_unlock(q); + return e; +} + + diff --git a/nmdb/queue.h b/nmdb/queue.h new file mode 100644 index 0000000..2933e1a --- /dev/null +++ b/nmdb/queue.h @@ -0,0 +1,43 @@ + +#ifndef _QUEUE_H +#define _QUEUE_H + +#include <pthread.h> /* for mutexes */ +#include <stdint.h> /* for uint32_t */ +#include "tipc.h" /* for req_info */ + +struct queue { + pthread_mutex_t lock; + size_t size; + + struct queue_entry *top, *bottom; +}; + +struct queue_entry { + uint32_t operation; + struct req_info *req; + + unsigned char *key; + unsigned char *val; + size_t ksize; + size_t vsize; + + struct queue_entry *prev; + /* A pointer to the next element on the list is actually not + * necessary, because it's not needed for put and get. + */ +}; + + +struct queue *queue_create(); +void queue_free(struct queue *q); + +struct queue_entry *queue_entry_create(); +void queue_entry_free(struct queue_entry *e); + +void queue_put(struct queue *q, struct queue_entry *e); +struct queue_entry *queue_get(struct queue *q); + + +#endif + diff --git a/nmdb/tipc.c b/nmdb/tipc.c new file mode 100644 index 0000000..1715d93 --- /dev/null +++ b/nmdb/tipc.c @@ -0,0 +1,490 @@ + +#include <sys/types.h> /* socket defines */ +#include <sys/socket.h> /* socket functions */ +#include <stdlib.h> /* malloc() */ +#include <linux/tipc.h> /* tipc stuff */ +#include <stdio.h> /* perror() */ +#include <stdint.h> /* uint32_t and friends */ +#include <arpa/inet.h> /* htonls() and friends */ +#include <string.h> /* memcpy() */ + +#include "tipc.h" +#include "common.h" +#include "queue.h" +#include "net-const.h" + + +static void parse_msg(struct req_info *req, unsigned char *buf, + size_t bsize); +static void parse_get(struct req_info *req, int impact_db); +static void parse_set(struct req_info *req, int impact_db, int async); +static void parse_del(struct req_info *req, int impact_db, int async); + + +/* + * Miscelaneous helper functions + */ + +static void rep_send_error(const struct req_info *req, const unsigned int code) +{ + int r, c; + unsigned char minibuf[3 * 4]; + + /* Network format: ID (4), REP_ERR (4), error code (4) */ + r = htonl(REP_ERR); + c = htonl(code); + memcpy(minibuf, &(req->id), 4); + memcpy(minibuf + 4, &r, 4); + memcpy(minibuf + 8, &c, 4); + + /* If this send fails, there's nothing to be done */ + r = sendto(req->fd, minibuf, 3 * 4, 0, (struct sockaddr *) req->clisa, + req->clilen); + + if (r < 0) { + perror("rep_send_error() failed"); + } +} + + +static int rep_send(const struct req_info *req, const unsigned char *buf, + const size_t size) +{ + int rv; + rv = sendto(req->fd, buf, size, 0, + (struct sockaddr *) req->clisa, req->clilen); + if (rv < 0) { + rep_send_error(req, ERR_SEND); + return 0; + } + return 1; +} + + +/* Send small replies, consisting in only a value. */ +static void mini_reply(struct req_info *req, uint32_t reply) +{ + /* We use a mini buffer to speedup the small replies, to avoid the + * malloc() overhead. */ + unsigned char minibuf[8]; + + reply = htonl(reply); + memcpy(minibuf, &(req->id), 4); + memcpy(minibuf + 4, &reply, 4); + rep_send(req, minibuf, 8); + return; +} + + +/* Create a queue entry structure based on the parameters passed. Memory + * allocated here will be free()'d in queue_entry_free(). It's not the + * cleanest way, but the alternatives are even messier. */ +static struct queue_entry *make_queue_entry(struct req_info *req, + uint32_t operation, const unsigned char *key, size_t ksize, + const unsigned char *val, size_t vsize) +{ + struct queue_entry *e; + unsigned char *kcopy, *vcopy; + + e = queue_entry_create(); + if (e == NULL) { + return NULL; + } + + kcopy = NULL; + if (key != NULL) { + kcopy = malloc(ksize); + if (kcopy == NULL) { + queue_entry_free(e); + return NULL; + } + memcpy(kcopy, key, ksize); + } + + vcopy = NULL; + if (val != NULL) { + vcopy = malloc(vsize); + if (vcopy == NULL) { + queue_entry_free(e); + if (kcopy != NULL) + free(kcopy); + return NULL; + } + memcpy(vcopy, val, vsize); + } + + e->operation = operation; + e->key = kcopy; + e->ksize = ksize; + e->val = vcopy; + e->vsize = vsize; + + /* Create a copy of req, including clisa */ + e->req = malloc(sizeof(struct req_info)); + if (e->req == NULL) { + queue_entry_free(e); + return NULL; + } + memcpy(e->req, req, sizeof(struct req_info)); + + e->req->clisa = malloc(sizeof(struct sockaddr_tipc)); + if (e->req->clisa == NULL) { + queue_entry_free(e); + return NULL; + } + memcpy(e->req->clisa, req->clisa, sizeof(struct sockaddr_tipc)); + + /* clear out unused fields */ + e->req->payload = NULL; + e->req->psize = 0; + + return e; +} + + +/* The tipc_reply_* functions are used by the db code to send the network + * replies. */ + +void tipc_reply_err(struct req_info *req, uint32_t reply) +{ + rep_send_error(req, reply); +} + +void tipc_reply_get(struct req_info *req, uint32_t reply, + unsigned char *val, size_t vsize) +{ + if (val == NULL) { + /* miss */ + mini_reply(req, reply); + } else { + unsigned char *buf; + size_t bsize; + uint32_t t; + + reply = htonl(reply); + + /* The reply length is: + * 4 id + * 4 reply code + * 4 vsize + * vsize val + */ + bsize = 4 + 4 + 4 + vsize; + buf = malloc(bsize); + + t = htonl(vsize); + + memcpy(buf, &(req->id), 4); + memcpy(buf + 4, &reply, 4); + memcpy(buf + 8, &t, 4); + memcpy(buf + 12, val, vsize); + + rep_send(req, buf, bsize); + free(buf); + } + return; + +} + + +void tipc_reply_set(struct req_info *req, uint32_t reply) +{ + mini_reply(req, reply); +} + + +void tipc_reply_del(struct req_info *req, uint32_t reply) +{ + mini_reply(req, reply); +} + + +/* + * Main functions for receiving and parsing + */ + +int tipc_init(void) +{ + int fd, rv; + static struct sockaddr_tipc srvsa; + + srvsa.family = AF_TIPC; + srvsa.addrtype = TIPC_ADDR_NAMESEQ; + srvsa.addr.nameseq.type = SERVER_TYPE; + srvsa.addr.nameseq.lower = SERVER_INST; + srvsa.addr.nameseq.upper = SERVER_INST; + srvsa.scope = TIPC_CLUSTER_SCOPE; + + fd = socket(AF_TIPC, SOCK_RDM, 0); + if (fd < 0) + return -1; + + rv = bind(fd, (struct sockaddr *) &srvsa, sizeof(srvsa)); + if (rv < 0) + return -1; + + return fd; +} + +/* Called by libevent for each receive event */ +void tipc_recv(int fd, short event, void *arg) +{ + int rv; + struct req_info req; + struct sockaddr_tipc clisa; + socklen_t clilen; + size_t bsize; + + /* Allocate enough to hold the max msg length of 66000 bytes. + * Originally, this was malloc()ed, but using the stack made it go + * from 27 usec for each set operation, to 23 usec. While it may sound + * worthless, it made test1 go from 3.213s to 2.345s for 37618 + * operations. + * TODO: check for negative impacts (beside being ugly, obviously) + */ + unsigned char buf[128 * 1024]; + bsize = 128 * 1024; + + clilen = sizeof(clisa); + + rv = recvfrom(fd, buf, bsize, 0, (struct sockaddr *) &clisa, + &clilen); + if (rv <= 0) { + /* rv == 0 means "return of an undeliverable message", which + * we ignore; -1 means other error. */ + goto exit; + } + + if (rv < 2) { + stats.net_broken_req++; + goto exit; + } + + req.fd = fd; + req.clisa = &clisa; + req.clilen = clilen; + + /* parse the message */ + parse_msg(&req, buf, rv); + +exit: + return; +} + + +/* Main message parsing and dissecting */ +static void parse_msg(struct req_info *req, unsigned char *buf, size_t bsize) +{ + uint32_t hdr, id, cmd; + uint8_t ver; + size_t psize; + unsigned char *payload; + + hdr = * (uint32_t *) buf; + hdr = htonl(hdr); + + /* FIXME: little endian-only */ + ver = (hdr & 0xF0000000) >> 28; + id = hdr & 0x0FFFFFFF; + + cmd = ntohl(* ((uint32_t *) buf + 1)); + + if (ver != PROTO_VER) { + stats.net_version_mismatch++; + rep_send_error(req, ERR_VER); + return; + } + + /* We define payload as the stuff after buf. But be careful because + * there might be none (if bsize == 1). Doing the pointer arithmetic + * isn't problematic, but accessing the payload should be done only if + * we know we have enough data. */ + payload = buf + 8; + psize = bsize - 8; + + /* store the id encoded in network byte order, so that we don't have + * to calculate it at send time */ + req->id = htonl(id); + req->cmd = cmd; + req->payload = payload; + req->psize = psize; + + if (cmd == REQ_CACHE_GET) + parse_get(req, 0); + else if (cmd == REQ_CACHE_SET) + parse_set(req, 0, 0); + else if (cmd == REQ_CACHE_DEL) + parse_del(req, 0, 0); + else if (cmd == REQ_GET) + parse_get(req, 1); + else if (cmd == REQ_SET) + parse_set(req, 1, 0); + else if (cmd == REQ_DEL) + parse_del(req, 1, 0); + else if (cmd == REQ_SET_ASYNC) + parse_set(req, 1, 1); + else if (cmd == REQ_DEL_ASYNC) + parse_del(req, 1, 1); + else { + stats.net_unk_req++; + rep_send_error(req, ERR_UNKREQ); + return; + } + + return; +} + + +static void parse_get(struct req_info *req, int impact_db) +{ + int hit; + unsigned char *key; + uint32_t ksize; + unsigned char *val = NULL; + size_t vsize = 0; + + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + if (req->psize < ksize) { + stats.net_broken_req++; + rep_send_error(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t); + + hit = cache_get(cache_table, key, ksize, &val, &vsize); + + if (!hit && !impact_db) { + mini_reply(req, REP_CACHE_MISS); + return; + } else if (!hit && impact_db) { + struct queue_entry *e; + e = make_queue_entry(req, REQ_GET, key, ksize, NULL, 0); + if (e == NULL) { + rep_send_error(req, ERR_MEM); + return; + } + queue_put(op_queue, e); + return; + } else { + tipc_reply_get(req, REP_CACHE_HIT, val, vsize); + return; + } +} + + +static void parse_set(struct req_info *req, int impact_db, int async) +{ + int rv; + unsigned char *key, *val; + uint32_t ksize, vsize; + const int max = 65536; + + /* Request format: + * 4 ksize + * 4 vsize + * ksize key + * vsize val + */ + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + vsize = * ( ((uint32_t *) req->payload) + 1), + vsize = ntohl(vsize); + + /* Sanity check on sizes: + * - ksize and vsize must both be < req->psize + * - ksize and vsize must both be < 2^16 = 64k + * - ksize + vsize < 2^16 = 64k + */ + if ( (req->psize < ksize) || (req->psize < vsize) || + (ksize > max) || (vsize > max) || + ( (ksize + vsize) > max) ) { + stats.net_broken_req++; + rep_send_error(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t) * 2; + val = key + ksize; + + rv = cache_set(cache_table, key, ksize, val, vsize); + if (!rv) { + rep_send_error(req, ERR_MEM); + return; + } + + if (impact_db) { + struct queue_entry *e; + uint32_t request; + + request = REQ_SET; + if (async) + request = REQ_SET_ASYNC; + + e = make_queue_entry(req, request, key, ksize, val, vsize); + if (e == NULL) { + rep_send_error(req, ERR_MEM); + return; + } + queue_put(op_queue, e); + + if (async) { + mini_reply(req, REP_OK); + } + return; + } else { + mini_reply(req, REP_OK); + } + + return; +} + + +static void parse_del(struct req_info *req, int impact_db, int async) +{ + int hit; + unsigned char *key; + uint32_t ksize; + + ksize = * (uint32_t *) req->payload; + ksize = ntohl(ksize); + if (req->psize < ksize) { + stats.net_broken_req++; + rep_send_error(req, ERR_BROKEN); + return; + } + + key = req->payload + sizeof(uint32_t); + + hit = cache_del(cache_table, key, ksize); + + if (!impact_db && hit) { + mini_reply(req, REP_OK); + } else if (!impact_db && !hit) { + mini_reply(req, REP_NOTIN); + } else if (impact_db) { + struct queue_entry *e; + uint32_t request; + + request = REQ_DEL; + if (async) + request = REQ_DEL_ASYNC; + + e = make_queue_entry(req, request, key, ksize, NULL, 0); + if (e == NULL) { + rep_send_error(req, ERR_MEM); + return; + } + queue_put(op_queue, e); + + if (async) { + mini_reply(req, REP_OK); + } + return; + } + + return; +} + + diff --git a/nmdb/tipc.h b/nmdb/tipc.h new file mode 100644 index 0000000..a717836 --- /dev/null +++ b/nmdb/tipc.h @@ -0,0 +1,34 @@ + +#ifndef _MYTIPC_H +#define _MYTIPC_H + +#include <stdint.h> /* uint32_t */ +#include <sys/types.h> /* size_t */ +#include <sys/socket.h> /* socklen_t */ +#include <linux/tipc.h> /* sockaddr_tipc */ + +struct req_info { + /* network information */ + int fd; + struct sockaddr_tipc *clisa; + socklen_t clilen; + + /* operation information */ + uint32_t id; + uint32_t cmd; + unsigned char *payload; + size_t psize; +}; + + +int tipc_init(void); +void tipc_recv(int fd, short event, void *arg); + +void tipc_reply_err(struct req_info *req, uint32_t reply); +void tipc_reply_get(struct req_info *req, uint32_t reply, + unsigned char *val, size_t vsize); +void tipc_reply_set(struct req_info *req, uint32_t reply); +void tipc_reply_del(struct req_info *req, uint32_t reply); + +#endif + diff --git a/python/LICENSE b/python/LICENSE new file mode 100644 index 0000000..e92bf0d --- /dev/null +++ b/python/LICENSE @@ -0,0 +1,34 @@ + +I don't like licenses, because I don't like having to worry about all this +legal stuff just for a simple piece of software I don't really mind anyone +using. But I also believe that it's important that people share and give back; +so I'm placing this library under the following license, so you feel guilty if +you don't ;) + + +BOLA - Buena Onda License Agreement +----------------------------------- + +This work is provided 'as-is', without any express or implied warranty. In no +event will the authors be held liable for any damages arising from the use of +this work. + +To all effects and purposes, this work is to be considered Public Domain. + + +However, if you want to be "Buena onda", you should: + +1. Not take credit for it, and give proper recognition to the authors. +2. Share your modifications, so everybody benefits from them. +4. Do something nice for the authors. +5. Help someone who needs it: sign up for some volunteer work or help your + neighbour paint the house. +6. Don't waste. Anything, but specially energy that comes from natural + non-renovable resources. Extra points if you discover or invent something + to replace them. +7. Be tolerant. Everything that's good in nature comes from cooperation. + +The order is important, and the further you go the more "Buena onda" you are. +Make the world a better place: be "Buena onda". + + diff --git a/python/nmdb.py b/python/nmdb.py new file mode 100644 index 0000000..d460e30 --- /dev/null +++ b/python/nmdb.py @@ -0,0 +1,73 @@ + +# +# libnmdb python wrapper +# Alberto Bertogli (albertito@gmail.com) +# + +import nmdb_ll + +class NetworkError (Exception): + pass + +class _nmdbDict (object): + def __init__(self, db, op_get, op_set, op_delete): + self.db = db + self.op_get = op_get + self.op_set = op_set + self.op_delete = op_delete + + def __getitem__(self, key): + try: + r = self.op_get(key) + except: + raise NetworkError + if not r: + raise KeyError + return r + + def __setitem__(self, key, val): + r = self.op_set(key, val) + if r <= 0: + raise NetworkError + return 1 + + def __delitem__(self, key): + r = self.op_delete(key) + if r < 0: + raise NetworkError + elif r == 0: + raise KeyError + return 1 + + def __contains__(self, key): + try: + r = self.op_get(key) + except: + raise NetworkError + if not r: + return False + return True + + def has_key(self, key): + return self.__contains__(key) + + +class Cache (_nmdbDict): + def __init__(self, port = -1): + db = nmdb_ll.connect(port) + _nmdbDict.__init__(self, db, db.cache_get, db.cache_set, + db.cache_delete) + +class DB (_nmdbDict): + def __init__(self, port = -1): + db = nmdb_ll.connect(port) + _nmdbDict.__init__(self, db, db.get, db.set, db.delete) + + +class AsyncDB (_nmdbDict): + def __init__(self, port = -1): + db = nmdb_ll.connect(port) + _nmdbDict.__init__(self, db, db.get, db.set_async, + db.delete_async) + + diff --git a/python/nmdb_ll.c b/python/nmdb_ll.c new file mode 100644 index 0000000..b963bb7 --- /dev/null +++ b/python/nmdb_ll.c @@ -0,0 +1,302 @@ + +/* + * Python bindings for libnmdb + * Alberto Bertogli (albertito@gmail.com) + * + * This is the low-level module, used by the python one to construct + * friendlier objects. + */ + +#include <Python.h> +#include <nmdb.h> + + +/* + * Type definitions + */ + +typedef struct { + PyObject_HEAD; + nmdb_t *db; +} nmdbobject; +static PyTypeObject nmdbType; + +/* + * The nmdb object + */ + +/* delete */ +static void db_dealloc(nmdbobject *db) +{ + if (db->db) { + nmdb_free(db->db); + } + PyObject_Del(db); +} + + +/* cache set */ +static PyObject *db_cache_set(nmdbobject *db, PyObject *args) +{ + unsigned char *key, *val; + int ksize, vsize; + int rv; + + if (!PyArg_ParseTuple(args, "s#s#:cache_set", &key, &ksize, + &val, &vsize)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_cache_set(db->db, key, ksize, val, vsize); + Py_END_ALLOW_THREADS + + return PyLong_FromLong(rv); +} + +/* cache get */ +static PyObject *db_cache_get(nmdbobject *db, PyObject *args) +{ + unsigned char *key, *val; + int ksize, vsize; + long rv; + PyObject *r; + + if (!PyArg_ParseTuple(args, "s#:cache_get", &key, &ksize)) { + return NULL; + } + + /* vsize is enough to hold the any value */ + vsize = 128 * 1024; + val = malloc(vsize); + if (val == NULL) + return PyErr_NoMemory(); + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_cache_get(db->db, key, ksize, val, vsize); + Py_END_ALLOW_THREADS + + if (rv < 0) { + /* FIXME: define a better exception */ + r = PyErr_SetFromErrno(PyExc_IOError); + } else if (rv == 0) { + r = PyString_FromStringAndSize("", 0); + } else { + r = PyString_FromStringAndSize(val, rv); + } + + free(val); + return r; +} + +/* cache delete */ +static PyObject *db_cache_delete(nmdbobject *db, PyObject *args) +{ + unsigned char *key; + int ksize; + int rv; + + if (!PyArg_ParseTuple(args, "s#:cache_delete", &key, &ksize)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_cache_del(db->db, key, ksize); + Py_END_ALLOW_THREADS + + return PyLong_FromLong(rv); +} + +/* db set */ +static PyObject *db_set(nmdbobject *db, PyObject *args) +{ + unsigned char *key, *val; + int ksize, vsize; + int rv; + + if (!PyArg_ParseTuple(args, "s#s#:set", &key, &ksize, + &val, &vsize)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_set(db->db, key, ksize, val, vsize); + Py_END_ALLOW_THREADS + + return PyLong_FromLong(rv); +} + +/* db get */ +static PyObject *db_get(nmdbobject *db, PyObject *args) +{ + unsigned char *key, *val; + int ksize, vsize; + long rv; + PyObject *r; + + if (!PyArg_ParseTuple(args, "s#:get", &key, &ksize)) { + return NULL; + } + + /* vsize is enough to hold the any value */ + vsize = 128 * 1024; + val = malloc(vsize); + if (val == NULL) + return PyErr_NoMemory(); + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_get(db->db, key, ksize, val, vsize); + Py_END_ALLOW_THREADS + + if (rv < 0) { + /* FIXME: define a better exception */ + r = PyErr_SetFromErrno(PyExc_IOError); + } else if (rv == 0) { + r = PyString_FromStringAndSize("", 0); + } else { + r = PyString_FromStringAndSize(val, rv); + } + + free(val); + return r; +} + +/* db delete */ +static PyObject *db_delete(nmdbobject *db, PyObject *args) +{ + unsigned char *key; + int ksize; + int rv; + + if (!PyArg_ParseTuple(args, "s#:delete", &key, &ksize)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_del(db->db, key, ksize); + Py_END_ALLOW_THREADS + + return PyLong_FromLong(rv); +} + + +/* db set async */ +static PyObject *db_set_async(nmdbobject *db, PyObject *args) +{ + unsigned char *key, *val; + int ksize, vsize; + int rv; + + if (!PyArg_ParseTuple(args, "s#s#:set_async", &key, &ksize, + &val, &vsize)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_set_async(db->db, key, ksize, val, vsize); + Py_END_ALLOW_THREADS + + return PyLong_FromLong(rv); +} + +/* db delete async */ +static PyObject *db_delete_async(nmdbobject *db, PyObject *args) +{ + unsigned char *key; + int ksize; + int rv; + + if (!PyArg_ParseTuple(args, "s#:delete_async", &key, &ksize)) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rv = nmdb_del_async(db->db, key, ksize); + Py_END_ALLOW_THREADS + + return PyLong_FromLong(rv); +} + + + +/* nmdb method table */ + +static PyMethodDef nmdb_methods[] = { + { "cache_set", (PyCFunction) db_cache_set, METH_VARARGS, NULL }, + { "cache_get", (PyCFunction) db_cache_get, METH_VARARGS, NULL }, + { "cache_delete", (PyCFunction) db_cache_delete, METH_VARARGS, NULL }, + { "set", (PyCFunction) db_set, METH_VARARGS, NULL }, + { "get", (PyCFunction) db_get, METH_VARARGS, NULL }, + { "delete", (PyCFunction) db_delete, METH_VARARGS, NULL }, + { "set_async", (PyCFunction) db_set_async, METH_VARARGS, NULL }, + { "delete_async", (PyCFunction) db_delete_async, METH_VARARGS, NULL }, + + { NULL } +}; + +static PyObject *db_getattr(nmdbobject *db, char *name) +{ + return Py_FindMethod(nmdb_methods, (PyObject *)db, name); +} + +static PyTypeObject nmdbType = { + PyObject_HEAD_INIT(NULL) + 0, + "nmdb_ll.nmdb", + sizeof(nmdbobject), + 0, + (destructor) db_dealloc, + 0, + (getattrfunc) db_getattr, +}; + + +/* + * The module + */ + +/* connect, returns an nmdb object */ +static PyObject *db_connect(PyObject *self, PyObject *args) +{ + nmdbobject *db; + long port; + + if (!PyArg_ParseTuple(args, "i:connect", &port)) { + return NULL; + } + + + db = PyObject_New(nmdbobject, &nmdbType); + if (db == NULL) + return NULL; + + db->db = nmdb_init(port); + if (db->db == NULL) { + return PyErr_NoMemory(); + } + + /* XXX: is this necessary? */ + if (PyErr_Occurred()) { + nmdb_free(db->db); + return NULL; + } + + return (PyObject *) db; +} + +static PyMethodDef nmdb_functions[] = { + { "connect", (PyCFunction) db_connect, METH_VARARGS, NULL }, + { NULL } +}; + +PyMODINIT_FUNC initnmdb_ll(void) +{ + PyObject *m; + + nmdbType.ob_type = &PyType_Type; + + m = Py_InitModule("nmdb_ll", nmdb_functions); +} + + + diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 0000000..0ba115e --- /dev/null +++ b/python/setup.py @@ -0,0 +1,17 @@ + +from distutils.core import setup, Extension + +nmdb_ll = Extension("nmdb_ll", + libraries = ['nmdb'], + sources = ['nmdb_ll.c']) + +setup( + name = 'nmdb', + description = "libnmdb bindings", + author = "Alberto Bertogli", + author_email = "albertito@gmail.com", + url = "http://auriga.wearlab.de/~alb/nmdb", + py_modules = ['nmdb'], + ext_modules = [nmdb_ll] +) +