git » nmdb » master » tree

[master] / nmdb / dbloop.c

#include <pthread.h>		/* threading functions */
#include <time.h>		/* nanosleep() */
#include <errno.h>		/* ETIMEDOUT */
#include <string.h>		/* memcmp() */
#include <stdlib.h>		/* malloc()/free() */
#include <stdio.h>		/* snprintf() */

#include "common.h"
#include "dbloop.h"
#include "be.h"
#include "queue.h"
#include "net-const.h"
#include "req.h"
#include "log.h"
#include "netutils.h"
#include "sparse.h"


static void *db_loop(void *arg);
static void process_op(struct db_conn *db, struct queue_entry *e);


/* Used to signal the loop that it should exit when the queue becomes empty.
 * It's not the cleanest way, but it's simple and effective. */
static int loop_should_stop = 0;


pthread_t *db_loop_start(struct db_conn *db)
{
	pthread_t *thread;

	thread = malloc(sizeof(pthread_t));
	if (thread == NULL)
		return NULL;

	pthread_create(thread, NULL, db_loop, (void *) db);

	return thread;
}

void db_loop_stop(pthread_t *thread)
{
	loop_should_stop = 1;
	pthread_join(*thread, NULL);
	free(thread);
	return;
}


static void *db_loop(void *arg)
{
	int rv;
	struct timespec ts;
	struct queue_entry *e;
	struct db_conn *db;

	db = (struct db_conn *) arg;

	for (;;) {
		/* Condition waits are specified with absolute timeouts, see
		 * pthread_cond_timedwait()'s SUSv3 specification for more
		 * information. We need to calculate it each time.
		 * We sleep for 1 sec. There's no real need for it to be too
		 * fast (it's only used so that stop detection doesn't take
		 * long), but we don't want it to be too slow either. */
		clock_gettime(CLOCK_REALTIME, &ts);
		ts.tv_sec += 1;

		rv = 0;
		queue_lock(op_queue);
		while (queue_isempty(op_queue) && rv == 0) {
			rv = queue_timedwait(op_queue, &ts);
		}

		if (rv != 0 && rv != ETIMEDOUT) {
			errlog("Error in queue_timedwait()");
			/* When the timedwait fails the lock is released, so
			 * we need to properly annotate this case. */
			__release(op_queue->lock);
			continue;
		}

		e = queue_get(op_queue);
		queue_unlock(op_queue);

		if (e == NULL) {
			if (loop_should_stop) {
				break;
			} else {
				continue;
			}
		}

		process_op(db, e);

		/* Free the entry that was allocated when tipc queued the
		 * operation. This also frees it's components. */
		queue_entry_free(e);
	}

	return NULL;
}

static void process_op(struct db_conn *db, struct queue_entry *e)
{
	int rv;
	if (e->operation == REQ_SET) {
		rv = db->set(db, e->key, e->ksize, e->val, e->vsize);
		if (!(e->req->flags & FLAGS_SYNC))
			return;

		if (!rv) {
			e->req->reply_err(e->req, ERR_DB);
			return;
		}
		e->req->reply_mini(e->req, REP_OK);

	} else if (e->operation == REQ_GET) {
		unsigned char *val;
		size_t vsize = 64 * 1024;

		val = malloc(vsize);
		if (val == NULL) {
			e->req->reply_err(e->req, ERR_MEM);
			return;
		}
		rv = db->get(db, e->key, e->ksize, val, &vsize);
		if (rv == 0) {
			e->req->reply_mini(e->req, REP_NOTIN);
			free(val);
			return;
		}
		e->req->reply_long(e->req, REP_OK, val, vsize);
		free(val);

	} else if (e->operation == REQ_DEL) {
		rv = db->del(db, e->key, e->ksize);
		if (!(e->req->flags & FLAGS_SYNC))
			return;

		if (rv == 0) {
			e->req->reply_mini(e->req, REP_NOTIN);
			return;
		}
		e->req->reply_mini(e->req, REP_OK);

	} else if (e->operation == REQ_CAS) {
		unsigned char *dbval;
		size_t dbvsize = 64 * 1024;

		/* Compare */
		dbval = malloc(dbvsize);
		if (dbval == NULL) {
			e->req->reply_err(e->req, ERR_MEM);
			return;
		}
		rv = db->get(db, e->key, e->ksize, dbval, &dbvsize);
		if (rv == 0) {
			e->req->reply_mini(e->req, REP_NOTIN);
			free(dbval);
			return;
		}

		if (e->vsize == dbvsize &&
				memcmp(e->val, dbval, dbvsize) == 0) {
			/* Swap */
			rv = db->set(db, e->key, e->ksize,
					e->newval, e->nvsize);
			if (!rv) {
				e->req->reply_err(e->req, ERR_DB);
				return;
			}

			e->req->reply_mini(e->req, REP_OK);
			free(dbval);
			return;
		}

		e->req->reply_mini(e->req, REP_NOMATCH);
		free(dbval);

	} else if (e->operation == REQ_INCR) {
		unsigned char *dbval;
		size_t dbvsize = 64 * 1024;
		int64_t intval;

		dbval = malloc(dbvsize);
		if (dbval == NULL) {
			e->req->reply_err(e->req, ERR_MEM);
			return;
		}
		rv = db->get(db, e->key, e->ksize, dbval, &dbvsize);
		if (rv == 0) {
			e->req->reply_mini(e->req, REP_NOTIN);
			free(dbval);
			return;
		}

		/* val must be NULL terminated; see cache_incr() */
		if (dbval && dbval[dbvsize - 1] != '\0') {
			e->req->reply_mini(e->req, REP_NOMATCH);
			free(dbval);
			return;
		}

		intval = strtoll((char *) dbval, NULL, 10);
		intval = intval + * (int64_t *) e->val;

		if (dbvsize < 24) {
			/* We know dbval is long enough because we've
			 * allocated it, so we only change dbvsize */
			dbvsize = 24;
		}

		snprintf((char *) dbval, dbvsize, "%23lld",
				(long long int) intval);

		rv = db->set(db, e->key, e->ksize, dbval, dbvsize);
		if (!rv) {
			e->req->reply_err(e->req, ERR_DB);
			return;
		}

		intval = htonll(intval);
		e->req->reply_long(e->req, REP_OK,
				(unsigned char *) &intval, sizeof(intval));

		free(dbval);

	} else if (e->operation == REQ_FIRSTKEY) {
		unsigned char *key;
		size_t ksize = 64 * 1024;

		if (db->firstkey == NULL) {
			e->req->reply_err(e->req, ERR_DB);
			return;
		}

		key = malloc(ksize);
		if (key == NULL) {
			e->req->reply_err(e->req, ERR_MEM);
			return;
		}
		rv = db->firstkey(db, key, &ksize);
		if (rv == 0) {
			e->req->reply_mini(e->req, REP_NOTIN);
			free(key);
			return;
		}
		e->req->reply_long(e->req, REP_OK, key, ksize);
		free(key);

	} else if (e->operation == REQ_NEXTKEY) {
		unsigned char *newkey;
		size_t nksize = 64 * 1024;

		if (db->nextkey == NULL) {
			e->req->reply_err(e->req, ERR_DB);
			return;
		}

		newkey = malloc(nksize);
		if (newkey == NULL) {
			e->req->reply_err(e->req, ERR_MEM);
			return;
		}
		rv = db->nextkey(db, e->key, e->ksize, newkey, &nksize);
		if (rv == 0) {
			e->req->reply_mini(e->req, REP_NOTIN);
			free(newkey);
			return;
		}
		e->req->reply_long(e->req, REP_OK, newkey, nksize);
		free(newkey);
	} else {
		wlog("Unknown op 0x%x\n", e->operation);
	}
}