git » nmdb » commit 8578ddf

Support building with multiple backends

author Alberto Bertogli
2010-04-17 20:42:29 UTC
committer Alberto Bertogli
2010-04-17 20:42:29 UTC
parent 244188082f180a9fdd4cff9a02cf0106e3971fc1

Support building with multiple backends

This patch makes nmdb build with support with several backends, and lets the
user select which one to use at runtime.

This includes major changes to the internal database API, but luckily it never
goes outside the dbloop.c, and most changes are simple.

The Makefile has been extended to auto-detect available backends at build
time, so the user no longer needs to select which ones to use (although it is
still possible).

Signed-off-by: Alberto Bertogli <albertito@blitiri.com.ar>

INSTALL +17 -16
nmdb/Makefile +28 -22
nmdb/be-bdb.c +59 -13
nmdb/be-null.c +41 -8
nmdb/be-qdbm.c +54 -10
nmdb/be-tc.c +48 -12
nmdb/be-tdb.c +58 -12
nmdb/be.c +45 -0
nmdb/be.h +84 -33
nmdb/common.h +2 -0
nmdb/dbloop.c +13 -12
nmdb/dbloop.h +2 -2
nmdb/main.c +21 -4

diff --git a/INSTALL b/INSTALL
index 862192d..f1c9574 100644
--- a/INSTALL
+++ b/INSTALL
@@ -4,10 +4,9 @@ Quick guide for the patience-impaired
 
 At the top level directory, run:
 
- $ make BACKEND=qdbm ENABLE_TIPC=0 ENABLE_SCTP=0 install
+ $ make ENABLE_TIPC=0 ENABLE_SCTP=0 install
 
-to build and install the server without TIPC and SCTP support, and to use the
-qdbm backend.
+to build and install the server without TIPC and SCTP support.
 
 
 How to compile and install
@@ -19,35 +18,37 @@ The only mandatory requisite to build nmdb is libevent
 There are several build-time options, in two groups: supported network
 protocols, and backend databases.
 
-The network protocols can all be enabled at the same time:
+The network protocols:
+
  * TCP and UDP, the well-known network protocols.
  * TIPC (http://tipc.sf.net/), a cluster-oriented network protocol. You will
 	need a Linux kernel >= 2.6.16 with TIPC support (most distributions
 	enable it by default).
  * SCTP, a network protocol similar to UDP and TCP, offering reliable message
- 	passing over IP, among other very useful things. You will need the
-	lksctp-tools package.
+	passing over IP, among other very useful things. You will need the
+	libsctp-dev (or equivalent) package.
+
+The backend databases:
 
-The backends, on the other hand, are mutually exclusive and only one can be
-selected:
- * qdbm (http://qdbm.sf.net/).
- * bdb (http://www.oracle.com/database/berkeley-db/).
+ * qdbm (http://qdbm.sf.net/)
+ * bdb (http://www.oracle.com/database/berkeley-db/)
  * Tokyo Cabinet (http://1978th.net/tokyocabinet/)
  * tdb (http://tdb.samba.org/)
- * A null backend, if you don't need a real one.
+ * A null backend (to use when you don't need a real one)
 
-By default, all network protocols are enabled, and the qdbm backend is
-selected.
+By default, all network protocols are enabled, and backends are automatically
+detected according to the available libraries.
 
 You can change the defaults by passing parameters to make, like this:
 
- $ make BACKEND=[qbdm|bdb|tc|tdb|null] ENABLE_$PROTO=[1|0]
+ $ make BE_ENABLE_$BACKEND=[1|0] ENABLE_$PROTO=[1|0]
 
-Where $PROTO can be TCP, UDP, TIPC or SCTP.
+Where $PROTO can be TCP, UDP, TIPC or SCTP, and $BACKEND can be QDBM, BDB, TC,
+TDB or NULL.
 
 For instance, to build with bdb backend and without TIPC and UDP support, use:
 
- $ make BACKEND=bdb ENABLE_TIPC=0
+ $ make BE_ENABLE_BDB=1 ENABLE_TIPC=0
 
 
 Tests
diff --git a/nmdb/Makefile b/nmdb/Makefile
index 9d94451..ffdd658 100644
--- a/nmdb/Makefile
+++ b/nmdb/Makefile
@@ -5,15 +5,29 @@ ENABLE_TCP = 1
 ENABLE_UDP = 1
 ENABLE_SCTP = 1
 
-# Backend to use, can be qdbm, bdb, tc, tdb, or null
-BACKEND = qdbm
+# Backends to enable
+BE_ENABLE_QDBM := $(shell if `pkg-config --exists qdbm`; \
+	then echo 1; else echo 0; fi)
+BE_ENABLE_BDB := 0
+BE_ENABLE_TC := $(shell if `pkg-config --exists tokyocabinet`; \
+	then echo 1; else echo 0; fi)
+BE_ENABLE_TDB := $(shell if `pkg-config --exists tdb`; \
+	then echo 1; else echo 0; fi)
+BE_ENABLE_NULL := 1
+
 
 CFLAGS += -std=c99 -pedantic -Wall -O3
 ALL_CFLAGS = -D_XOPEN_SOURCE=600 $(CFLAGS)
 ALL_CFLAGS += -DENABLE_TIPC=$(ENABLE_TIPC) \
 		-DENABLE_TCP=$(ENABLE_TCP) \
 		-DENABLE_UDP=$(ENABLE_UDP) \
-		-DENABLE_SCTP=$(ENABLE_SCTP)
+		-DENABLE_SCTP=$(ENABLE_SCTP) \
+		-DBE_ENABLE_QDBM=$(BE_ENABLE_QDBM) \
+		-DBE_ENABLE_BDB=$(BE_ENABLE_BDB) \
+		-DBE_ENABLE_TC=$(BE_ENABLE_TC) \
+		-DBE_ENABLE_TDB=$(BE_ENABLE_TDB) \
+		-DBE_ENABLE_NULL=$(BE_ENABLE_NULL) \
+
 
 ifdef DEBUG
 ALL_CFLAGS += -g
@@ -32,7 +46,8 @@ endif
 PREFIX=/usr/local
 
 
-OBJS = cache.o dbloop.o queue.o log.o net.o netutils.o parse.o stats.o main.o
+OBJS = cache.o dbloop.o queue.o log.o net.o netutils.o parse.o stats.o main.o \
+       be.o be-bdb.o be-null.o be-qdbm.o be-tc.o be-tdb.o
 LIBS = -levent -lpthread -lrt
 
 
@@ -60,32 +75,23 @@ else
 	OBJS += sctp-stub.o
 endif
 
-# Use series of ifeq-endif instead of else-ifeq because otherwise the nesting
-# is a mess. Using "else ifeq ..." in the same line is only supported from
-# gmake 3.81, which is too new.
-ifeq ($(BACKEND), qdbm)
-	OBJS += be-qdbm.o
-	ALL_CFLAGS += `pkg-config qdbm --cflags` -DBACKEND_QDBM
+
+ifeq ($(BE_ENABLE_QDBM), 1)
+	ALL_CFLAGS += `pkg-config qdbm --cflags`
 	LIBS += `pkg-config qdbm --libs-only-L` -lqdbm
 endif
-ifeq ($(BACKEND), bdb)
-	OBJS += be-bdb.o
-	ALL_CFLAGS += -DBACKEND_BDB
+ifeq ($(BE_ENABLE_BDB), 1)
 	LIBS += -ldb
 endif
-ifeq ($(BACKEND), tc)
-	OBJS += be-tc.o
-	ALL_CFLAGS += `pkg-config tokyocabinet --cflags` -DBACKEND_TC
+ifeq ($(BE_ENABLE_TC), 1)
+	ALL_CFLAGS += `pkg-config tokyocabinet --cflags`
 	LIBS += `pkg-config tokyocabinet --libs`
 endif
-ifeq ($(BACKEND), tdb)
-	OBJS += be-tdb.o
-	ALL_CFLAGS += `pkg-config tdb --cflags` -DBACKEND_TDB
+ifeq ($(BE_ENABLE_TDB), 1)
+	ALL_CFLAGS += `pkg-config tdb --cflags`
 	LIBS += `pkg-config tdb --libs`
 endif
-ifeq ($(BACKEND), null)
-	OBJS += be-null.o
-	ALL_CFLAGS += -DBACKEND_NULL
+ifeq ($(BE_ENABLE_NULL), 1)
 endif
 
 
diff --git a/nmdb/be-bdb.c b/nmdb/be-bdb.c
index 4a930f8..8233fd3 100644
--- a/nmdb/be-bdb.c
+++ b/nmdb/be-bdb.c
@@ -1,43 +1,76 @@
 
+#if BE_ENABLE_BDB
+
 #include <string.h>	/* memset() */
+#include <stddef.h>	/* NULL */
+
+/* typedefs to work around db.h include bug */
+typedef unsigned int u_int;
+typedef unsigned long u_long;
+#include <db.h>
+
 #include "be.h"
 
 
-db_t *db_open(const char *name, int flags)
+int bdb_set(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t vsize);
+int bdb_get(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t *vsize);
+int bdb_del(struct db_conn *db, const unsigned char *key, size_t ksize);
+int bdb_close(struct db_conn *db);
+
+
+struct db_conn *bdb_open(const char *name, int flags)
 {
 	int rv;
-	db_t *db;
+	struct db_conn *db;
+	DB *bdb_db;
 
-	rv = db_create(&db, NULL, 0);
+	rv = db_create(&bdb_db, NULL, 0);
 	if (rv != 0)
 		return NULL;
 
-	rv = db->open(db, NULL, name, NULL, DB_HASH, DB_CREATE, 0);
+	rv = bdb_db->open(bdb_db, NULL, name, NULL, DB_HASH, DB_CREATE, 0);
 	if (rv != 0) {
-		db->close(db, 0);
+		bdb_db->close(bdb_db, 0);
 		return NULL;
 	}
 
+	db = malloc(sizeof(struct db_conn));
+	if (db == NULL) {
+		bdb_db->close(bdb_db, 0);
+		return NULL;
+
+	db->conn = bdb_db;
+	db->set = bdb_set;
+	db->get = bdb_get;
+	db->del = bdb_del;
+	db->close = bdb_close;
+
 	return db;
 }
 
 
-int db_close(db_t *db)
+int bdb_close(struct db_conn *db)
 {
 	int rv;
+	DB *bdb_db = (DB *) db->conn;
 
-	rv = db->close(db, 0);
+	rv = bdb_db->close(bdb_db, 0);
 	if (rv != 0)
 		return 0;
+
+	free(db);
 	return 1;
 }
 
 
-int db_set(db_t *db, const unsigned char *key, size_t ksize,
+int bdb_set(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t vsize)
 {
 	int rv;
 	DBT k, v;
+	DB *bdb_db = (DB *) db->conn;
 
 	memset(&k, 0, sizeof(DBT));
 	memset(&v, 0, sizeof(DBT));
@@ -49,18 +82,19 @@ int db_set(db_t *db, const unsigned char *key, size_t ksize,
 	v.data = val;
 	v.size = vsize;
 
-	rv = db->put(db, NULL, &k, &v, 0);
+	rv = bdb_db->put(bdb_db, NULL, &k, &v, 0);
 	if (rv != 0)
 		return 0;
 	return 1;
 }
 
 
-int db_get(db_t *db, const unsigned char *key, size_t ksize,
+int bdb_get(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t *vsize)
 {
 	int rv;
 	DBT k, v;
+	DB *bdb_db = (DB *) db->conn;
 
 	memset(&k, 0, sizeof(DBT));
 	memset(&v, 0, sizeof(DBT));
@@ -71,7 +105,7 @@ int db_get(db_t *db, const unsigned char *key, size_t ksize,
 	v.ulen = *vsize;
 	v.flags = DB_DBT_USERMEM;	/* we supplied the memory */
 
-	rv = db->get(db, NULL, &k, &v, 0);
+	rv = bdb_db->get(bdb_db, NULL, &k, &v, 0);
 	if (rv != 0) {
 		return 0;
 	} else {
@@ -80,10 +114,11 @@ int db_get(db_t *db, const unsigned char *key, size_t ksize,
 	}
 }
 
-int db_del(db_t *db, const unsigned char *key, size_t ksize)
+int bdb_del(struct db_conn *db, const unsigned char *key, size_t ksize)
 {
 	int rv;
 	DBT k, v;
+	DB *bdb_db = (DB *) db->conn;
 
 	memset(&k, 0, sizeof(DBT));
 	memset(&v, 0, sizeof(DBT));
@@ -91,9 +126,20 @@ int db_del(db_t *db, const unsigned char *key, size_t ksize)
 	k.data = key;
 	k.size = ksize;
 
-	rv = db->del(db, NULL, &k, 0);
+	rv = bdb_db->del(bdb_db, NULL, &k, 0);
 	if (rv != 0)
 		return 0;
 	return 1;
 }
 
+#else
+
+#include <stddef.h>	/* NULL */
+
+struct db_conn *bdb_open(const char *name, int flags)
+{
+	return NULL;
+}
+
+#endif
+
diff --git a/nmdb/be-null.c b/nmdb/be-null.c
index c358fa0..79d67b6 100644
--- a/nmdb/be-null.c
+++ b/nmdb/be-null.c
@@ -1,37 +1,70 @@
 
+#if BE_ENABLE_NULL
+
 #include <stddef.h>	/* size_t */
+#include <stdlib.h>	/* malloc() and friends */
 #include "be.h"
 
 
-db_t *db_open(const char *name, int flags)
+int null_set(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t vsize);
+int null_get(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t *vsize);
+int null_del(struct db_conn *db, const unsigned char *key, size_t ksize);
+int null_close(struct db_conn *db);
+
+
+struct db_conn *null_open(const char *name, int flags)
 {
-	/* Use a dumb not-null pointer because it is never looked at outside
-	 * the functions defined here */
-	return (db_t *) 1;
+	struct db_conn *db;
+
+	db = malloc(sizeof(struct db_conn));
+	if (db == NULL)
+		return NULL;
+
+	db->conn = NULL;
+	db->set = null_set;
+	db->get = null_get;
+	db->del = null_del;
+	db->close = null_close;
+
+	return db;
 }
 
 
-int db_close(db_t *db)
+int null_close(struct db_conn *db)
 {
+	free(db);
 	return 1;
 }
 
 
-int db_set(db_t *db, const unsigned char *key, size_t ksize,
+int null_set(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t vsize)
 {
 	return 1;
 }
 
 
-int db_get(db_t *db, const unsigned char *key, size_t ksize,
+int null_get(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t *vsize)
 {
 	return 0;
 }
 
-int db_del(db_t *db, const unsigned char *key, size_t ksize)
+int null_del(struct db_conn *db, const unsigned char *key, size_t ksize)
 {
 	return 0;
 }
 
+#else
+
+#include <stddef.h>	/* NULL */
+
+struct db_conn *null_open(const char *name, int flags)
+{
+	return NULL;
+}
+
+#endif
+
diff --git a/nmdb/be-qdbm.c b/nmdb/be-qdbm.c
index 3c98f9b..409fecf 100644
--- a/nmdb/be-qdbm.c
+++ b/nmdb/be-qdbm.c
@@ -1,38 +1,71 @@
 
+#if BE_ENABLE_QDBM
+
 #include <depot.h>	/* QDBM's Depot API */
 #include <stdlib.h>
 
 #include "be.h"
 
 
-db_t *db_open(const char *name, int flags)
+int qdbm_set(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t vsize);
+int qdbm_get(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t *vsize);
+int qdbm_del(struct db_conn *db, const unsigned char *key, size_t ksize);
+int qdbm_close(struct db_conn *db);
+
+
+struct db_conn *qdbm_open(const char *name, int flags)
 {
 	int f;
+	struct db_conn *db;
+	DEPOT *qdbm_db;
 
 	f = DP_OREADER | DP_OWRITER | DP_ONOLCK | DP_OCREAT;
-	return dpopen(name, f, 0);
+	qdbm_db = dpopen(name, f, 0);
+	if (qdbm_db == NULL)
+		return NULL;
+
+	db = malloc(sizeof(struct db_conn));
+	if (db == NULL) {
+		dpclose(qdbm_db);
+		return NULL;
+	}
+
+	db->conn = qdbm_db;
+	db->set = qdbm_set;
+	db->get = qdbm_get;
+	db->del = qdbm_del;
+	db->close = qdbm_close;
+
+	return db;
 }
 
 
-int db_close(db_t *db)
+int qdbm_close(struct db_conn *db)
 {
-	return dpclose(db);
+	int rv;
+
+	rv = dpclose(db->conn);
+	free(db);
+	return rv;
 }
 
 
-int db_set(db_t *db, const unsigned char *key, size_t ksize,
+int qdbm_set(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t vsize)
 {
-	return dpput(db, (char *) key, ksize, (char *) val, vsize, DP_DOVER);
+	return dpput(db->conn, (char *) key, ksize,
+			(char *) val, vsize, DP_DOVER);
 }
 
 
-int db_get(db_t *db, const unsigned char *key, size_t ksize,
+int qdbm_get(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t *vsize)
 {
 	int rv;
 
-	rv = dpgetwb(db, (char *) key, ksize, 0, *vsize, (char *) val);
+	rv = dpgetwb(db->conn, (char *) key, ksize, 0, *vsize, (char *) val);
 	if (rv >= 0) {
 		*vsize = rv;
 		return 1;
@@ -41,8 +74,19 @@ int db_get(db_t *db, const unsigned char *key, size_t ksize,
 	}
 }
 
-int db_del(db_t *db, const unsigned char *key, size_t ksize)
+int qdbm_del(struct db_conn *db, const unsigned char *key, size_t ksize)
 {
-	return dpout(db, (char *) key, ksize);
+	return dpout(db->conn, (char *) key, ksize);
 }
 
+#else
+
+#include <stddef.h>	/* NULL */
+
+struct db_conn *qdbm_open(const char *name, int flags)
+{
+	return NULL;
+}
+
+#endif
+
diff --git a/nmdb/be-tc.c b/nmdb/be-tc.c
index 11f5267..a035626 100644
--- a/nmdb/be-tc.c
+++ b/nmdb/be-tc.c
@@ -1,42 +1,67 @@
 
+#if BE_ENABLE_TC
+
 #include <tchdb.h>	/* Tokyo Cabinet's hash API */
 #include <stdlib.h>
 
 #include "be.h"
 
 
-db_t *db_open(const char *name, int flags)
+int tc_set(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t vsize);
+int tc_get(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t *vsize);
+int tc_del(struct db_conn *db, const unsigned char *key, size_t ksize);
+int tc_close(struct db_conn *db);
+
+
+struct db_conn *tc_open(const char *name, int flags)
 {
-	db_t *db = tchdbnew();
+	struct db_conn *db;
+	TCHDB *tc_db = tchdbnew();
 
-	if (!tchdbopen(db, name, HDBOWRITER | HDBOCREAT))
+	if (!tchdbopen(qdbm_db, name, HDBOWRITER | HDBOCREAT))
 		return NULL;
 
+	db = malloc(sizeof(struct db_conn));
+	if (db == NULL) {
+		tchdbclose(tc_db);
+		tchdbdel(tc_db);
+		return NULL;
+	}
+
+	db->conn = tc_db;
+	db->set = tc_set;
+	db->get = tc_get;
+	db->del = tc_del;
+	db->close = tc_close;
+
 	return db;
 }
 
 
-int db_close(db_t *db)
+int tc_close(struct db_conn *db)
 {
-	int r = tchdbclose(db);
-	tchdbdel(db);
+	int r = tchdbclose(db->conn);
+	tchdbdel(db->conn);
+	free(db);
 	return r;
 }
 
 
-int db_set(db_t *db, const unsigned char *key, size_t ksize,
+int tc_set(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t vsize)
 {
-	return tchdbput(db, key, ksize, val, vsize);
+	return tchdbput(db->conn, key, ksize, val, vsize);
 }
 
 
-int db_get(db_t *db, const unsigned char *key, size_t ksize,
+int tc_get(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t *vsize)
 {
 	int rv;
 
-	rv = tchdbget3(db, key, ksize, val, *vsize);
+	rv = tchdbget3(db->conn, key, ksize, val, *vsize);
 	if (rv >= 0) {
 		*vsize = rv;
 		return 1;
@@ -45,8 +70,19 @@ int db_get(db_t *db, const unsigned char *key, size_t ksize,
 	}
 }
 
-int db_del(db_t *db, const unsigned char *key, size_t ksize)
+int tc_del(struct db_conn *db, const unsigned char *key, size_t ksize)
 {
-	return tchdbout(db, key, ksize);
+	return tchdbout(db->conn, key, ksize);
 }
 
+#else
+
+#include <stddef.h>	/* NULL */
+
+struct db_conn *tc_open(const char *name, int flags)
+{
+	return NULL;
+}
+
+#endif
+
diff --git a/nmdb/be-tdb.c b/nmdb/be-tdb.c
index 7c9f8a0..9a578bb 100644
--- a/nmdb/be-tdb.c
+++ b/nmdb/be-tdb.c
@@ -1,28 +1,63 @@
 
+#if BE_ENABLE_TDB
+
 #include <string.h>	/* memcpy() */
+#include <stdlib.h>	/* malloc() and friends */
 
 /* tdb.h needs mode_t defined externally, and it is defined in one of these
 (which are the ones required for open() */
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <fcntl.h>
+#include <tdb.h>
 
 #include "be.h"
 
+/* Local operations names are prepended with an 'x' so they don't collide with
+ * tdb real functions */
 
-db_t *db_open(const char *name, int flags)
-{
-	return tdb_open(name, 0, 0, O_CREAT | O_RDWR, 0640);
-}
+int xtdb_set(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t vsize);
+int xtdb_get(struct db_conn *db, const unsigned char *key, size_t ksize,
+		unsigned char *val, size_t *vsize);
+int xtdb_del(struct db_conn *db, const unsigned char *key, size_t ksize);
+int xtdb_close(struct db_conn *db);
 
 
-int db_close(db_t *db)
+struct db_conn *xtdb_open(const char *name, int flags)
 {
-	return tdb_close(db) == 0;
+	struct db_conn *db;
+	TDB_CONTEXT *tdb_db;
+
+	tdb_db = tdb_open(name, 0, 0, O_CREAT | O_RDWR, 0640);
+	if (tdb_db == NULL)
+		return NULL;
+
+	db = malloc(sizeof(struct db_conn));
+	if (db == NULL) {
+		tdb_close(tdb_db);
+		return NULL;
+	}
+
+	db->conn = tdb_db;
+	db->set = xtdb_set;
+	db->get = xtdb_get;
+	db->del = xtdb_del;
+	db->close = xtdb_close;
+
+	return db;
 }
 
+int xtdb_close(struct db_conn *db)
+{
+	int rv;
+
+	rv = tdb_close(db->conn);
+	free(db);
+	return rv == 0;
+}
 
-int db_set(db_t *db, const unsigned char *key, size_t ksize,
+int xtdb_set(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t vsize)
 {
 	TDB_DATA k, v;
@@ -34,11 +69,11 @@ int db_set(db_t *db, const unsigned char *key, size_t ksize,
 	v.dptr = val;
 	v.dsize = vsize;
 
-	return tdb_store(db, k, v, TDB_REPLACE) == 0;
+	return tdb_store(db->conn, k, v, TDB_REPLACE) == 0;
 }
 
 
-int db_get(db_t *db, const unsigned char *key, size_t ksize,
+int xtdb_get(struct db_conn *db, const unsigned char *key, size_t ksize,
 		unsigned char *val, size_t *vsize)
 {
 	TDB_DATA k, v;
@@ -46,7 +81,7 @@ int db_get(db_t *db, const unsigned char *key, size_t ksize,
 	k.dptr = key;
 	k.dsize = ksize;
 
-	v = tdb_fetch(db, k);
+	v = tdb_fetch(db->conn, k);
 	if (v.dptr == NULL)
 		return 0;
 
@@ -59,13 +94,24 @@ int db_get(db_t *db, const unsigned char *key, size_t ksize,
 	return 1;
 }
 
-int db_del(db_t *db, const unsigned char *key, size_t ksize)
+int xtdb_del(struct db_conn *db, const unsigned char *key, size_t ksize)
 {
 	TDB_DATA k;
 
 	k.dptr = key;
 	k.dsize = ksize;
 
-	return tdb_delete(db, k) == 0;
+	return tdb_delete(db->conn, k) == 0;
 }
 
+#else
+
+#include <stddef.h>	/* NULL */
+
+struct db_conn *xtdb_open(const char *name, int flags)
+{
+	return NULL;
+}
+
+#endif
+
diff --git a/nmdb/be.c b/nmdb/be.c
new file mode 100644
index 0000000..01f3d9a
--- /dev/null
+++ b/nmdb/be.c
@@ -0,0 +1,45 @@
+
+#include <string.h>		/* strcmp() */
+#include "be.h"
+
+/* Openers for each supported backend, defined on each be-X.c file */
+struct db_conn *qdbm_open(const char *name, int flags);
+struct db_conn *bdb_open(const char *name, int flags);
+struct db_conn *tc_open(const char *name, int flags);
+struct db_conn *xtdb_open(const char *name, int flags);
+struct db_conn *null_open(const char *name, int flags);
+
+
+struct db_conn *db_open(enum backend_type type, const char *name, int flags)
+{
+	switch (type) {
+	case BE_QDBM:
+		return qdbm_open(name, flags);
+	case BE_BDB:
+		return bdb_open(name, flags);
+	case BE_TC:
+		return tc_open(name, flags);
+	case BE_TDB:
+		return xtdb_open(name, flags);
+	case BE_NULL:
+		return null_open(name, flags);
+	default:
+		return NULL;
+	}
+}
+
+enum backend_type be_type_from_str(const char *name)
+{
+	if (strcmp(name, "qdbm") == 0)
+		return BE_ENABLE_QDBM ? BE_QDBM : BE_UNSUPPORTED;
+	if (strcmp(name, "bdb") == 0)
+		return BE_ENABLE_BDB ? BE_BDB : BE_UNSUPPORTED;
+	if (strcmp(name, "tc") == 0)
+		return BE_ENABLE_TC ? BE_TC : BE_UNSUPPORTED;
+	if (strcmp(name, "tdb") == 0)
+		return BE_ENABLE_TDB ? BE_TDB : BE_UNSUPPORTED;
+	if (strcmp(name, "null") == 0)
+		return BE_ENABLE_NULL ? BE_NULL : BE_UNSUPPORTED;
+	return BE_UNKNOWN;
+}
+
diff --git a/nmdb/be.h b/nmdb/be.h
index f320361..7f9e3a1 100644
--- a/nmdb/be.h
+++ b/nmdb/be.h
@@ -2,45 +2,96 @@
 #ifndef _BE_H
 #define _BE_H
 
-/* Depending on our backend, we define db_t to be different things so the
- * generic code doesn't have to care about which backend we're using. */
-#if defined BACKEND_QDBM
-  #include <depot.h>
-  typedef DEPOT db_t;
-
-#elif defined BACKEND_BDB
-  /* typedefs to work around db.h include bug */
-  typedef unsigned int u_int;
-  typedef unsigned long u_long;
-  #include <db.h>
-  typedef DB db_t;
-
-#elif defined BACKEND_TC
-  #include <tchdb.h>
-  typedef TCHDB db_t;
-
-#elif defined BACKEND_TDB
-  #include <tdb.h>
-  typedef TDB_CONTEXT db_t;
-
-#elif defined BACKEND_NULL
-  typedef int db_t;
+#include <stddef.h>	/* size_t */
 
+struct db_conn {
+	/* This is where the backend sets a reference to the connection, which
+	 * will be properly casted when needed */
+	void *conn;
+
+	/* Operations */
+	int (*set)(struct db_conn *db, const unsigned char *key, size_t ksize,
+			unsigned char *val, size_t vsize);
+	int (*get)(struct db_conn *db, const unsigned char *key, size_t ksize,
+			unsigned char *val, size_t *vsize);
+	int (*del)(struct db_conn *db, const unsigned char *key, size_t ksize);
+	int (*close)(struct db_conn *db);
+
+};
+
+enum backend_type {
+	/* The first two are special, used to indicate unknown and unsupported
+	 * backends */
+	BE_UNKNOWN = -2,
+	BE_UNSUPPORTED = -1,
+	BE_QDBM = 1,
+	BE_BDB,
+	BE_TC,
+	BE_TDB,
+	BE_NULL,
+};
+
+/* Generic opener that knows about all the backends */
+struct db_conn *db_open(enum backend_type type, const char *name, int flags);
+
+/* Returns the backend type for the given name. */
+enum backend_type be_type_from_str(const char *name);
+
+/* String containing a list of all supported backends */
+#if BE_ENABLE_QDBM
+  #define _QDBM_SUPP "qdbm "
 #else
-  #error "Unknown backend"
-  /* Define it anyway, so this is the only warning/error the user sees. */
-  typedef int db_t;
+  #define _QDBM_SUPP ""
 #endif
 
+#if BE_ENABLE_BDB
+  #define _BDB_SUPP "bdb "
+#else
+  #define _BDB_SUPP ""
+#endif
 
-db_t *db_open(const char *name, int flags);
-int db_close(db_t *db);
-int db_set(db_t *db, const unsigned char *key, size_t ksize,
-		unsigned char *val, size_t vsize);
-int db_get(db_t *db, const unsigned char *key, size_t ksize,
-		unsigned char *val, size_t *vsize);
-int db_del(db_t *db, const unsigned char *key, size_t ksize);
+#if BE_ENABLE_TC
+  #define _TC_SUPP "tc "
+#else
+  #define _TC_SUPP ""
+#endif
 
+#if BE_ENABLE_TDB
+  #define _TDB_SUPP "tdb "
+#else
+  #define _TDB_SUPP ""
 #endif
 
+#if BE_ENABLE_NULL
+  #define _NULL_SUPP "null "
+#else
+  #define _NULL_SUPP ""
+#endif
+
+#define SUPPORTED_BE _QDBM_SUPP _BDB_SUPP _TC_SUPP _TDB_SUPP _NULL_SUPP
+
+
+/* Default backend */
+#if BE_ENABLE_TDB
+  #define DEFAULT_BE BE_TDB
+  #define DEFAULT_BE_NAME "tdb"
+#elif BE_ENABLE_TC
+  #define DEFAULT_BE BE_TC
+  #define DEFAULT_BE_NAME "tc"
+#elif BE_ENABLE_QDBM
+  #define DEFAULT_BE BE_QDBM
+  #define DEFAULT_BE_NAME "qdbm"
+#elif BE_ENABLE_BDB
+  #define DEFAULT_BE BE_BDB
+  #define DEFAULT_BE_NAME "bdb"
+#elif BE_ENABLE_NULL
+  #warning "using null backend as the default"
+  #define DEFAULT_BE BE_NULL
+  #define DEFAULT_BE_NAME "null"
+#else
+  #error "no backend available"
+#endif
+
+
+#endif
 
diff --git a/nmdb/common.h b/nmdb/common.h
index a1b129e..53b17ed 100644
--- a/nmdb/common.h
+++ b/nmdb/common.h
@@ -13,6 +13,7 @@ extern struct cache *cache_table;
 extern struct queue *op_queue;
 
 /* Settings */
+#include "be.h"
 struct settings {
 	int tipc_lower;
 	int tipc_upper;
@@ -27,6 +28,7 @@ struct settings {
 	int passive;
 	char *dbname;
 	char *logfname;
+	enum backend_type backend;
 };
 extern struct settings settings;
 
diff --git a/nmdb/dbloop.c b/nmdb/dbloop.c
index bb20504..4fe75b7 100644
--- a/nmdb/dbloop.c
+++ b/nmdb/dbloop.c
@@ -18,7 +18,7 @@
 
 
 static void *db_loop(void *arg);
-static void process_op(db_t *db, struct queue_entry *e);
+static void process_op(struct db_conn *db, struct queue_entry *e);
 
 
 /* Used to signal the loop that it should exit when the queue becomes empty.
@@ -26,7 +26,7 @@ static void process_op(db_t *db, struct queue_entry *e);
 static int loop_should_stop = 0;
 
 
-pthread_t *db_loop_start(db_t *db)
+pthread_t *db_loop_start(struct db_conn *db)
 {
 	pthread_t *thread;
 
@@ -53,9 +53,9 @@ static void *db_loop(void *arg)
 	int rv;
 	struct timespec ts;
 	struct queue_entry *e;
-	db_t *db;
+	struct db_conn *db;
 
-	db = (db_t *) arg;
+	db = (struct db_conn *) arg;
 
 	for (;;) {
 		/* Condition waits are specified with absolute timeouts, see
@@ -102,11 +102,11 @@ static void *db_loop(void *arg)
 	return NULL;
 }
 
-static void process_op(db_t *db, struct queue_entry *e)
+static void process_op(struct db_conn *db, struct queue_entry *e)
 {
 	int rv;
 	if (e->operation == REQ_SET) {
-		rv = db_set(db, e->key, e->ksize, e->val, e->vsize);
+		rv = db->set(db, e->key, e->ksize, e->val, e->vsize);
 		if (!(e->req->flags & FLAGS_SYNC))
 			return;
 
@@ -125,7 +125,7 @@ static void process_op(db_t *db, struct queue_entry *e)
 			e->req->reply_err(e->req, ERR_MEM);
 			return;
 		}
-		rv = db_get(db, e->key, e->ksize, val, &vsize);
+		rv = db->get(db, e->key, e->ksize, val, &vsize);
 		if (rv == 0) {
 			e->req->reply_mini(e->req, REP_NOTIN);
 			free(val);
@@ -135,7 +135,7 @@ static void process_op(db_t *db, struct queue_entry *e)
 		free(val);
 
 	} else if (e->operation == REQ_DEL) {
-		rv = db_del(db, e->key, e->ksize);
+		rv = db->del(db, e->key, e->ksize);
 		if (!(e->req->flags & FLAGS_SYNC))
 			return;
 
@@ -155,7 +155,7 @@ static void process_op(db_t *db, struct queue_entry *e)
 			e->req->reply_err(e->req, ERR_MEM);
 			return;
 		}
-		rv = db_get(db, e->key, e->ksize, dbval, &dbvsize);
+		rv = db->get(db, e->key, e->ksize, dbval, &dbvsize);
 		if (rv == 0) {
 			e->req->reply_mini(e->req, REP_NOTIN);
 			free(dbval);
@@ -165,7 +165,8 @@ static void process_op(db_t *db, struct queue_entry *e)
 		if (e->vsize == dbvsize &&
 				memcmp(e->val, dbval, dbvsize) == 0) {
 			/* Swap */
-			rv = db_set(db, e->key, e->ksize, e->newval, e->nvsize);
+			rv = db->set(db, e->key, e->ksize,
+					e->newval, e->nvsize);
 			if (!rv) {
 				e->req->reply_err(e->req, ERR_DB);
 				return;
@@ -189,7 +190,7 @@ static void process_op(db_t *db, struct queue_entry *e)
 			e->req->reply_err(e->req, ERR_MEM);
 			return;
 		}
-		rv = db_get(db, e->key, e->ksize, dbval, &dbvsize);
+		rv = db->get(db, e->key, e->ksize, dbval, &dbvsize);
 		if (rv == 0) {
 			e->req->reply_mini(e->req, REP_NOTIN);
 			free(dbval);
@@ -215,7 +216,7 @@ static void process_op(db_t *db, struct queue_entry *e)
 		snprintf((char *) dbval, dbvsize, "%23lld",
 				(long long int) intval);
 
-		rv = db_set(db, e->key, e->ksize, dbval, dbvsize);
+		rv = db->set(db, e->key, e->ksize, dbval, dbvsize);
 		if (!rv) {
 			e->req->reply_err(e->req, ERR_DB);
 			return;
diff --git a/nmdb/dbloop.h b/nmdb/dbloop.h
index a4a9d63..ddb3664 100644
--- a/nmdb/dbloop.h
+++ b/nmdb/dbloop.h
@@ -3,9 +3,9 @@
 #define _DBLOOP_H
 
 #include <pthread.h>		/* for pthread_t */
-#include "be.h"			/* for db_t */
+#include "be.h"			/* for struct db_conn */
 
-pthread_t *db_loop_start(db_t *db);
+pthread_t *db_loop_start(struct db_conn *db);
 void db_loop_stop(pthread_t *thread);
 
 #endif
diff --git a/nmdb/main.c b/nmdb/main.c
index 79d92b4..b638c26 100644
--- a/nmdb/main.c
+++ b/nmdb/main.c
@@ -13,6 +13,7 @@
 #include "net-const.h"
 #include "log.h"
 #include "stats.h"
+#include "be.h"
 
 #define DEFDBNAME "database"
 
@@ -28,6 +29,7 @@ static void help(void) {
 	char h[] = \
 	  "nmdb [options]\n"
 	  "\n"
+	  "  -b backend	backend to use (" DEFAULT_BE_NAME ")\n"
 	  "  -d dbpath	database path ('database' by default)\n"
 	  "  -l lower	TIPC lower port number (10)\n"
 	  "  -L upper	TIPC upper port number (= lower)\n"
@@ -43,6 +45,8 @@ static void help(void) {
 	  "  -p		enable passive mode, for redundancy purposes (read docs.)\n"
 	  "  -h		show this help\n"
 	  "\n"
+	  "Available backends: " SUPPORTED_BE "\n"
+	  "\n"
 	  "Please report bugs to Alberto Bertogli (albertito@blitiri.com.ar)\n"
 	  "\n";
 	printf("%s", h);
@@ -65,12 +69,17 @@ static int load_settings(int argc, char **argv)
 	settings.foreground = 0;
 	settings.passive = 0;
 	settings.logfname = "-";
+	settings.backend = DEFAULT_BE;
 
 	settings.dbname = malloc(strlen(DEFDBNAME) + 1);
 	strcpy(settings.dbname, DEFDBNAME);
 
-	while ((c = getopt(argc, argv, "d:l:L:t:T:u:U:s:S:c:o:fph?")) != -1) {
+	while ((c = getopt(argc, argv,
+				"b:d:l:L:t:T:u:U:s:S:c:o:fph?")) != -1) {
 		switch(c) {
+		case 'b':
+			settings.backend = be_type_from_str(optarg);
+			break;
 		case 'd':
 			free(settings.dbname);
 			settings.dbname = malloc(strlen(optarg) + 1);
@@ -150,6 +159,14 @@ static int load_settings(int argc, char **argv)
 	if (settings.numobjs == -1)
 		settings.numobjs = 128 * 1024;
 
+	if (settings.backend == BE_UNKNOWN) {
+		printf("Error: unknown backend\n");
+		return 0;
+	} else if (settings.backend == BE_UNSUPPORTED) {
+		printf("Error: unsupported backend\n");
+		return 0;
+	}
+
 	return 1;
 }
 
@@ -158,7 +175,7 @@ int main(int argc, char **argv)
 {
 	struct cache *cd;
 	struct queue *q;
-	db_t *db;
+	struct db_conn *db;
 	pid_t pid;
 	pthread_t *dbthread;
 
@@ -186,7 +203,7 @@ int main(int argc, char **argv)
 	}
 	op_queue = q;
 
-	db = db_open(settings.dbname, 0);
+	db = db_open(settings.backend, settings.dbname, 0);
 	if (db == NULL) {
 		errlog("Error opening DB");
 		return 1;
@@ -214,7 +231,7 @@ int main(int argc, char **argv)
 
 	db_loop_stop(dbthread);
 
-	db_close(db);
+	db->close(db);
 
 	queue_free(q);