author | Alberto Bertogli
<albertito@gmail.com> 2006-09-13 05:37:04 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2006-09-13 05:37:04 UTC |
parent | 1bd1299e369bf9c7c73ff54377e1c9138f5fc4a6 |
nmdb/Makefile | +1 | -1 |
nmdb/db.c | +26 | -7 |
nmdb/queue.c | +34 | -18 |
nmdb/queue.h | +8 | -2 |
nmdb/tipc.c | +18 | -0 |
diff --git a/nmdb/Makefile b/nmdb/Makefile index 315609c..fe76217 100644 --- a/nmdb/Makefile +++ b/nmdb/Makefile @@ -20,7 +20,7 @@ default: all all: nmdb nmdb: $(OBJS) - $(CC) $(CFLAGS) $(OBJS) -levent -lpthread -lqdbm -o nmdb + $(CC) $(CFLAGS) $(OBJS) -levent -lpthread -lrt -lqdbm -o nmdb .c.o: $(CC) $(CFLAGS) -c $< -o $@ diff --git a/nmdb/db.c b/nmdb/db.c index 7fac2f7..e264ce4 100644 --- a/nmdb/db.c +++ b/nmdb/db.c @@ -1,6 +1,7 @@ #include <pthread.h> /* threading functions */ #include <time.h> /* nanosleep() */ +#include <errno.h> /* ETIMEDOUT */ #include <stdio.h> @@ -44,27 +45,45 @@ void db_loop_stop(pthread_t *thread) static void *db_loop(void *arg) { - struct queue_entry *e; + int rv; struct timespec ts; + struct queue_entry *e; db_t *db; db = (db_t *) arg; - /* We will sleep this amount of time when the queue is empty. It's - * hardcoded, but needs testing. Currenly 0.2s. */ - ts.tv_sec = 0; - ts.tv_nsec = 1000000 / 5; - for (;;) { + /* Condition waits are specified with absolute timeouts, see + * pthread_cond_timedwait()'s SUSv3 specification for more + * information. We need to calculate it each time. + * We sleep for 1 sec. There's no real need for it to be too + * fast (it's only used so that stop detection doesn't take + * long), but we don't want it to be too slow either. */ + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 1; + + rv = 0; + queue_lock(op_queue); + while (queue_isempty(op_queue) && rv == 0) { + rv = queue_timedwait(op_queue, &ts); + } + + if (rv != 0 && rv != ETIMEDOUT) { + perror("Error in queue_timedwait()"); + continue; + } + e = queue_get(op_queue); + queue_unlock(op_queue); + if (e == NULL) { if (loop_should_stop) { break; } else { - nanosleep(&ts, NULL); continue; } } + process_op(db, e); /* Free the entry that was allocated when tipc queued the diff --git a/nmdb/queue.c b/nmdb/queue.c index 3badf82..7eeda67 100644 --- a/nmdb/queue.c +++ b/nmdb/queue.c @@ -23,10 +23,11 @@ struct queue *queue_create() pthread_mutex_init(&(q->lock), &attr); pthread_mutexattr_destroy(&attr); + pthread_cond_init(&(q->cond), NULL); + return q; } - void queue_free(struct queue *q) { struct queue_entry *e; @@ -44,8 +45,25 @@ void queue_free(struct queue *q) } -#define queue_lock(q) do { pthread_mutex_lock(&((q)->lock)); } while (0) -#define queue_unlock(q) do { pthread_mutex_unlock(&((q)->lock)); } while (0) +void queue_lock(struct queue *q) +{ + pthread_mutex_lock(&(q->lock)); +} + +void queue_unlock(struct queue *q) +{ + pthread_mutex_unlock(&(q->lock)); +} + +void queue_signal(struct queue *q) +{ + pthread_cond_signal(&(q->cond)); +} + +int queue_timedwait(struct queue *q, struct timespec *ts) +{ + return pthread_cond_timedwait(&(q->cond), &(q->lock), ts); +} struct queue_entry *queue_entry_create() @@ -82,7 +100,6 @@ void queue_entry_free(struct queue_entry *e) { void queue_put(struct queue *q, struct queue_entry *e) { - queue_lock(q); if (q->top == NULL) { q->top = q->bottom = e; } else { @@ -90,30 +107,29 @@ void queue_put(struct queue *q, struct queue_entry *e) q->top = e; } q->size += 1; - queue_unlock(q); return; } - struct queue_entry *queue_get(struct queue *q) { struct queue_entry *e, *t; - queue_lock(q); - if (q->bottom == NULL) { - e = NULL; - } else { - e = q->bottom; - t = q->bottom->prev; - q->bottom = t; - if (t == NULL) { - /* it's empty now */ - q->top = NULL; - } + if (q->bottom == NULL) + return NULL; + + e = q->bottom; + t = q->bottom->prev; + q->bottom = t; + if (t == NULL) { + /* it's empty now */ + q->top = NULL; } q->size -= 1; - queue_unlock(q); return e; } +int queue_isempty(struct queue *q) +{ + return (q->size == 0); +} diff --git a/nmdb/queue.h b/nmdb/queue.h index 2933e1a..69a0002 100644 --- a/nmdb/queue.h +++ b/nmdb/queue.h @@ -8,8 +8,9 @@ struct queue { pthread_mutex_t lock; - size_t size; + pthread_cond_t cond; + size_t size; struct queue_entry *top, *bottom; }; @@ -35,9 +36,14 @@ void queue_free(struct queue *q); struct queue_entry *queue_entry_create(); void queue_entry_free(struct queue_entry *e); +void queue_lock(struct queue *q); +void queue_unlock(struct queue *q); +void queue_signal(struct queue *q); +int queue_timedwait(struct queue *q, struct timespec *ts); + void queue_put(struct queue *q, struct queue_entry *e); struct queue_entry *queue_get(struct queue *q); - +int queue_isempty(struct queue *q); #endif diff --git a/nmdb/tipc.c b/nmdb/tipc.c index 1148e48..da4731f 100644 --- a/nmdb/tipc.c +++ b/nmdb/tipc.c @@ -382,7 +382,10 @@ static void parse_get(struct req_info *req, int impact_db) rep_send_error(req, ERR_MEM); return; } + queue_lock(op_queue); queue_put(op_queue, e); + queue_unlock(op_queue); + queue_signal(op_queue); return; } else { tipc_reply_get(req, REP_CACHE_HIT, val, vsize); @@ -444,10 +447,19 @@ static void parse_set(struct req_info *req, int impact_db, int async) rep_send_error(req, ERR_MEM); return; } + queue_lock(op_queue); queue_put(op_queue, e); + queue_unlock(op_queue); if (async) { mini_reply(req, REP_OK); + } else { + /* Signal the DB thread it has work only if it's a + * synchronous operation, asynchronous don't mind + * waiting. It does have a measurable impact on + * performance (2083847usec vs 2804973usec for sets on + * "test2d 100000 10 10". */ + queue_signal(op_queue); } return; } else { @@ -493,11 +505,17 @@ static void parse_del(struct req_info *req, int impact_db, int async) rep_send_error(req, ERR_MEM); return; } + queue_lock(op_queue); queue_put(op_queue, e); + queue_unlock(op_queue); if (async) { mini_reply(req, REP_OK); + } else { + /* See comment on parse_set(). */ + queue_signal(op_queue); } + return; }