git » nmdb » commit 87d9d2f

Use conditional waits for waking up the db thread.

author Alberto Bertogli
2006-09-13 05:37:04 UTC
committer Alberto Bertogli
2006-09-13 05:37:04 UTC
parent 1bd1299e369bf9c7c73ff54377e1c9138f5fc4a6

Use conditional waits for waking up the db thread.
This patch changes the db thread wakeup mechanism to be conditional waits,
instead of just nanosleep as it used to be.

It has several advantages, the main one being the reduction of the latency for
all synchronous operations, including get which on cache misses is always
synchronous, and reduced CPU time on idle.

The wakeup is done in a conditional way, to avoid waking the thread up when an
asynchronous operation is queued. As there is no need for an inmediate
response, we skip the wakeup, because it will notice it when the timer
expired, reducing the response latency.

To verify this conditional wakeup was worthy, I ran "test2d 10000 10 10" and
compared the set and del times:

	cond	always
	wakeup	wakeup
---------------------------
set	2083847 2804973
del	1986236 2605961

It's a clear win for little effort.

nmdb/Makefile +1 -1
nmdb/db.c +26 -7
nmdb/queue.c +34 -18
nmdb/queue.h +8 -2
nmdb/tipc.c +18 -0

diff --git a/nmdb/Makefile b/nmdb/Makefile
index 315609c..fe76217 100644
--- a/nmdb/Makefile
+++ b/nmdb/Makefile
@@ -20,7 +20,7 @@ default: all
 all: nmdb
 
 nmdb: $(OBJS)
-	$(CC) $(CFLAGS) $(OBJS) -levent -lpthread -lqdbm -o nmdb
+	$(CC) $(CFLAGS) $(OBJS) -levent -lpthread -lrt -lqdbm -o nmdb
 
 .c.o:
 	$(CC) $(CFLAGS) -c $< -o $@
diff --git a/nmdb/db.c b/nmdb/db.c
index 7fac2f7..e264ce4 100644
--- a/nmdb/db.c
+++ b/nmdb/db.c
@@ -1,6 +1,7 @@
 
 #include <pthread.h>		/* threading functions */
 #include <time.h>		/* nanosleep() */
+#include <errno.h>		/* ETIMEDOUT */
 
 #include <stdio.h>
 
@@ -44,27 +45,45 @@ void db_loop_stop(pthread_t *thread)
 
 static void *db_loop(void *arg)
 {
-	struct queue_entry *e;
+	int rv;
 	struct timespec ts;
+	struct queue_entry *e;
 	db_t *db;
 
 	db = (db_t *) arg;
 
-	/* We will sleep this amount of time when the queue is empty. It's
-	 * hardcoded, but needs testing. Currenly 0.2s. */
-	ts.tv_sec = 0;
-	ts.tv_nsec = 1000000 / 5;
-
 	for (;;) {
+		/* Condition waits are specified with absolute timeouts, see
+		 * pthread_cond_timedwait()'s SUSv3 specification for more
+		 * information. We need to calculate it each time.
+		 * We sleep for 1 sec. There's no real need for it to be too
+		 * fast (it's only used so that stop detection doesn't take
+		 * long), but we don't want it to be too slow either. */
+		clock_gettime(CLOCK_REALTIME, &ts);
+		ts.tv_sec += 1;
+
+		rv = 0;
+		queue_lock(op_queue);
+		while (queue_isempty(op_queue) && rv == 0) {
+			rv = queue_timedwait(op_queue, &ts);
+		}
+
+		if (rv != 0 && rv != ETIMEDOUT) {
+			perror("Error in queue_timedwait()");
+			continue;
+		}
+
 		e = queue_get(op_queue);
+		queue_unlock(op_queue);
+
 		if (e == NULL) {
 			if (loop_should_stop) {
 				break;
 			} else {
-				nanosleep(&ts, NULL);
 				continue;
 			}
 		}
+
 		process_op(db, e);
 
 		/* Free the entry that was allocated when tipc queued the
diff --git a/nmdb/queue.c b/nmdb/queue.c
index 3badf82..7eeda67 100644
--- a/nmdb/queue.c
+++ b/nmdb/queue.c
@@ -23,10 +23,11 @@ struct queue *queue_create()
 	pthread_mutex_init(&(q->lock), &attr);
 	pthread_mutexattr_destroy(&attr);
 
+	pthread_cond_init(&(q->cond), NULL);
+
 	return q;
 }
 
-
 void queue_free(struct queue *q)
 {
 	struct queue_entry *e;
@@ -44,8 +45,25 @@ void queue_free(struct queue *q)
 }
 
 
-#define queue_lock(q) do { pthread_mutex_lock(&((q)->lock)); } while (0)
-#define queue_unlock(q) do { pthread_mutex_unlock(&((q)->lock)); } while (0)
+void queue_lock(struct queue *q)
+{
+	pthread_mutex_lock(&(q->lock));
+}
+
+void queue_unlock(struct queue *q)
+{
+	pthread_mutex_unlock(&(q->lock));
+}
+
+void queue_signal(struct queue *q)
+{
+	pthread_cond_signal(&(q->cond));
+}
+
+int queue_timedwait(struct queue *q, struct timespec *ts)
+{
+	return pthread_cond_timedwait(&(q->cond), &(q->lock), ts);
+}
 
 
 struct queue_entry *queue_entry_create()
@@ -82,7 +100,6 @@ void queue_entry_free(struct queue_entry *e) {
 
 void queue_put(struct queue *q, struct queue_entry *e)
 {
-	queue_lock(q);
 	if (q->top == NULL) {
 		q->top = q->bottom = e;
 	} else {
@@ -90,30 +107,29 @@ void queue_put(struct queue *q, struct queue_entry *e)
 		q->top = e;
 	}
 	q->size += 1;
-	queue_unlock(q);
 	return;
 }
 
-
 struct queue_entry *queue_get(struct queue *q)
 {
 	struct queue_entry *e, *t;
 
-	queue_lock(q);
-	if (q->bottom == NULL) {
-		e = NULL;
-	} else {
-		e = q->bottom;
-		t = q->bottom->prev;
-		q->bottom = t;
-		if (t == NULL) {
-			/* it's empty now */
-			q->top = NULL;
-		}
+	if (q->bottom == NULL)
+		return NULL;
+
+	e = q->bottom;
+	t = q->bottom->prev;
+	q->bottom = t;
+	if (t == NULL) {
+		/* it's empty now */
+		q->top = NULL;
 	}
 	q->size -= 1;
-	queue_unlock(q);
 	return e;
 }
 
+int queue_isempty(struct queue *q)
+{
+	return (q->size == 0);
+}
 
diff --git a/nmdb/queue.h b/nmdb/queue.h
index 2933e1a..69a0002 100644
--- a/nmdb/queue.h
+++ b/nmdb/queue.h
@@ -8,8 +8,9 @@
 
 struct queue {
 	pthread_mutex_t lock;
-	size_t size;
+	pthread_cond_t cond;
 
+	size_t size;
 	struct queue_entry *top, *bottom;
 };
 
@@ -35,9 +36,14 @@ void queue_free(struct queue *q);
 struct queue_entry *queue_entry_create();
 void queue_entry_free(struct queue_entry *e);
 
+void queue_lock(struct queue *q);
+void queue_unlock(struct queue *q);
+void queue_signal(struct queue *q);
+int queue_timedwait(struct queue *q, struct timespec *ts);
+
 void queue_put(struct queue *q, struct queue_entry *e);
 struct queue_entry *queue_get(struct queue *q);
-
+int queue_isempty(struct queue *q);
 
 #endif
 
diff --git a/nmdb/tipc.c b/nmdb/tipc.c
index 1148e48..da4731f 100644
--- a/nmdb/tipc.c
+++ b/nmdb/tipc.c
@@ -382,7 +382,10 @@ static void parse_get(struct req_info *req, int impact_db)
 			rep_send_error(req, ERR_MEM);
 			return;
 		}
+		queue_lock(op_queue);
 		queue_put(op_queue, e);
+		queue_unlock(op_queue);
+		queue_signal(op_queue);
 		return;
 	} else {
 		tipc_reply_get(req, REP_CACHE_HIT, val, vsize);
@@ -444,10 +447,19 @@ static void parse_set(struct req_info *req, int impact_db, int async)
 			rep_send_error(req, ERR_MEM);
 			return;
 		}
+		queue_lock(op_queue);
 		queue_put(op_queue, e);
+		queue_unlock(op_queue);
 
 		if (async) {
 			mini_reply(req, REP_OK);
+		} else {
+			/* Signal the DB thread it has work only if it's a
+			 * synchronous operation, asynchronous don't mind
+			 * waiting. It does have a measurable impact on
+			 * performance (2083847usec vs 2804973usec for sets on
+			 * "test2d 100000 10 10". */
+			queue_signal(op_queue);
 		}
 		return;
 	} else {
@@ -493,11 +505,17 @@ static void parse_del(struct req_info *req, int impact_db, int async)
 			rep_send_error(req, ERR_MEM);
 			return;
 		}
+		queue_lock(op_queue);
 		queue_put(op_queue, e);
+		queue_unlock(op_queue);
 
 		if (async) {
 			mini_reply(req, REP_OK);
+		} else {
+			/* See comment on parse_set(). */
+			queue_signal(op_queue);
 		}
+
 		return;
 	}