git » libjio » commit 5b0626c

[ABI BREAK] Implement automatic syncing

author Alberto Bertogli
2009-04-11 03:38:38 UTC
committer Alberto Bertogli
2009-04-12 13:51:17 UTC
parent e35082e8f739c065afc88907389c01a4d699d6b7

[ABI BREAK] Implement automatic syncing

This patch implements automatic syncing, which works by creating a thread that
will periodically call jsync(), which is useful when working with lingering
transactions, which improve performance quite a bit.

As a side effect, it fixes some lingering transaction bugs introduced with the
isolation of the journal code.

Signed-off-by: Alberto Bertogli <albertito@blitiri.com.ar>

UPGRADING +2 -0
bindings/python2/libjio.c +53 -0
bindings/python3/libjio.c +53 -0
libjio/Makefile +3 -2
libjio/autosync.c +124 -0
libjio/common.h +2 -0
libjio/journal.c +3 -3
libjio/journal.h +1 -0
libjio/libjio.3 +17 -0
libjio/libjio.h +7 -0
libjio/trans.c +9 -2

diff --git a/UPGRADING b/UPGRADING
index e5a4201..966e096 100644
--- a/UPGRADING
+++ b/UPGRADING
@@ -6,6 +6,8 @@ You should always clean all your files before upgrading. While I don't expect
 the transaction on-disk format to change, it's a good practise and it doesn't
 take much effort. When it's mandatory, it will be noted.
 
+-> 0.26
+  - Applications need to be recompiled due to a change in the jfs structure.
 
 -> 0.25
   - It is no longer necessary to pass O_SYNC to jopen() if lingering
diff --git a/bindings/python2/libjio.c b/bindings/python2/libjio.c
index a29f8f1..85ce8d1 100644
--- a/bindings/python2/libjio.c
+++ b/bindings/python2/libjio.c
@@ -319,6 +319,55 @@ static PyObject *jf_jmove_journal(jfile_object *fp, PyObject *args)
 	return PyLong_FromLong(rv);
 }
 
+/* jfs_autosync_start() */
+PyDoc_STRVAR(jf_autosync_start__doc,
+"autosync_start(max_sec, max_bytes)\n\
+\n\
+Starts the automatic sync thread (only useful when using lingering\n\
+transactions).\n");
+
+static PyObject *jf_autosync_start(jfile_object *fp, PyObject *args)
+{
+	int rv;
+	unsigned int max_sec, max_bytes;
+
+	if (!PyArg_ParseTuple(args, "II:autosync_start", &max_sec,
+				&max_bytes))
+		return NULL;
+
+	Py_BEGIN_ALLOW_THREADS
+	rv = jfs_autosync_start(fp->fs, max_sec, max_bytes);
+	Py_END_ALLOW_THREADS
+
+	if (rv != 0)
+		return PyErr_SetFromErrno(PyExc_IOError);
+
+	return PyLong_FromLong(rv);
+}
+
+/* jfs_autosync_stop() */
+PyDoc_STRVAR(jf_autosync_stop__doc,
+"autosync_stop()\n\
+\n\
+Stops the automatic sync thread started by autosync_start()\n");
+
+static PyObject *jf_autosync_stop(jfile_object *fp, PyObject *args)
+{
+	int rv;
+
+	if (!PyArg_ParseTuple(args, ":autosync_stop"))
+		return NULL;
+
+	Py_BEGIN_ALLOW_THREADS
+	rv = jfs_autosync_stop(fp->fs);
+	Py_END_ALLOW_THREADS
+
+	if (rv != 0)
+		return PyErr_SetFromErrno(PyExc_IOError);
+
+	return PyLong_FromLong(rv);
+}
+
 /* new_trans */
 PyDoc_STRVAR(jf_new_trans__doc,
 "new_trans()\n\
@@ -365,6 +414,10 @@ static PyMethodDef jfile_methods[] = {
 	{ "jsync", (PyCFunction) jf_jsync, METH_VARARGS, jf_jsync__doc },
 	{ "jmove_journal", (PyCFunction) jf_jmove_journal, METH_VARARGS,
 		jf_jmove_journal__doc },
+	{ "autosync_start", (PyCFunction) jf_autosync_start, METH_VARARGS,
+		jf_autosync_start__doc },
+	{ "autosync_stop", (PyCFunction) jf_autosync_stop, METH_VARARGS,
+		jf_autosync_stop__doc },
 	{ "new_trans", (PyCFunction) jf_new_trans, METH_VARARGS,
 		jf_new_trans__doc },
 	{ NULL }
diff --git a/bindings/python3/libjio.c b/bindings/python3/libjio.c
index 7d63691..79e93ce 100644
--- a/bindings/python3/libjio.c
+++ b/bindings/python3/libjio.c
@@ -320,6 +320,55 @@ static PyObject *jf_jmove_journal(jfile_object *fp, PyObject *args)
 	return PyLong_FromLong(rv);
 }
 
+/* jfs_autosync_start() */
+PyDoc_STRVAR(jf_autosync_start__doc,
+"autosync_start(max_sec, max_bytes)\n\
+\n\
+Starts the automatic sync thread (only useful when using lingering\n\
+transactions).\n");
+
+static PyObject *jf_autosync_start(jfile_object *fp, PyObject *args)
+{
+	int rv;
+	unsigned int max_sec, max_bytes;
+
+	if (!PyArg_ParseTuple(args, "II:autosync_start", &max_sec,
+				&max_bytes))
+		return NULL;
+
+	Py_BEGIN_ALLOW_THREADS
+	rv = jfs_autosync_start(fp->fs, max_sec, max_bytes);
+	Py_END_ALLOW_THREADS
+
+	if (rv != 0)
+		return PyErr_SetFromErrno(PyExc_IOError);
+
+	return PyLong_FromLong(rv);
+}
+
+/* jfs_autosync_stop() */
+PyDoc_STRVAR(jf_autosync_stop__doc,
+"autosync_stop()\n\
+\n\
+Stops the automatic sync thread started by autosync_start()\n");
+
+static PyObject *jf_autosync_stop(jfile_object *fp, PyObject *args)
+{
+	int rv;
+
+	if (!PyArg_ParseTuple(args, ":autosync_stop"))
+		return NULL;
+
+	Py_BEGIN_ALLOW_THREADS
+	rv = jfs_autosync_stop(fp->fs);
+	Py_END_ALLOW_THREADS
+
+	if (rv != 0)
+		return PyErr_SetFromErrno(PyExc_IOError);
+
+	return PyLong_FromLong(rv);
+}
+
 /* new_trans */
 PyDoc_STRVAR(jf_new_trans__doc,
 "new_trans()\n\
@@ -365,6 +414,10 @@ static PyMethodDef jfile_methods[] = {
 	{ "jsync", (PyCFunction) jf_jsync, METH_VARARGS, jf_jsync__doc },
 	{ "jmove_journal", (PyCFunction) jf_jmove_journal, METH_VARARGS,
 		jf_jmove_journal__doc },
+	{ "autosync_start", (PyCFunction) jf_autosync_start, METH_VARARGS,
+		jf_autosync_start__doc },
+	{ "autosync_stop", (PyCFunction) jf_autosync_stop, METH_VARARGS,
+		jf_autosync_stop__doc },
 	{ "new_trans", (PyCFunction) jf_new_trans, METH_VARARGS,
 		jf_new_trans__doc },
 	{ NULL }
diff --git a/libjio/Makefile b/libjio/Makefile
index 72e4e18..6ccb28f 100644
--- a/libjio/Makefile
+++ b/libjio/Makefile
@@ -10,7 +10,7 @@ MANDATORY_LDFLAGS := $(shell getconf LFS_LIBS 2>/dev/null)
 ALL_CFLAGS += $(CFLAGS) $(MANDATORY_CFLAGS) -fPIC
 ALL_LDFLAGS += $(LDFLAGS) $(MANDATORY_LDFLAGS) -fPIC
 
-LIBS = -lpthread
+LIBS = -lpthread -lrt
 
 ifdef DEBUG
 ALL_CFLAGS += -g
@@ -39,7 +39,8 @@ endif
 
 
 # objects to build
-OBJS = checksum.o common.o compat.o trans.o check.o journal.o unix.o ansi.o
+OBJS = autosync.o checksum.o common.o compat.o trans.o check.o journal.o \
+       unix.o ansi.o
 
 # rules
 default: all
diff --git a/libjio/autosync.c b/libjio/autosync.c
new file mode 100644
index 0000000..2ba0fe3
--- /dev/null
+++ b/libjio/autosync.c
@@ -0,0 +1,124 @@
+
+#include <pthread.h>	/* pthread_* */
+#include <errno.h>	/* ETIMEDOUT */
+#include <signal.h>	/* sig_atomic_t */
+#include <stdlib.h>	/* malloc() and friends */
+#include <time.h>	/* clock_gettime() */
+
+#include "common.h"
+#include "libjio.h"
+
+struct autosync_cfg {
+	struct jfs *fs;
+	pthread_t tid;
+
+	time_t max_sec;
+	size_t max_bytes;
+
+	/* When the thread must die, we set this to 1 */
+	sig_atomic_t must_die;
+
+	/* Condition variable to wake up the thread */
+	pthread_cond_t cond;
+	pthread_mutex_t mutex;
+};
+
+/* Thread that performes the automatic syncing */
+static void *autosync_thread(void *arg)
+{
+	int rv;
+	void *had_errors;
+	struct timespec ts;
+	struct autosync_cfg *cfg;
+
+	cfg = (struct autosync_cfg *) arg;
+
+	/* had_errors is a void * just to avoid weird casts, since we want to
+	 * return it, but it's used as a boolean */
+	had_errors = (void *) 0;
+
+	pthread_mutex_lock(&cfg->mutex);
+	for (;;) {
+		clock_gettime(CLOCK_REALTIME, &ts);
+		ts.tv_sec += cfg->max_sec;
+
+		rv = pthread_cond_timedwait(&cfg->cond, &cfg->mutex, &ts);
+		if (rv != 0 && rv != ETIMEDOUT)
+			break;
+
+		if (cfg->must_die)
+			break;
+
+		/* cover from spurious wakeups */
+		if (rv != ETIMEDOUT && cfg->fs->ltrans_len < cfg->max_bytes)
+			continue;
+
+		rv = jsync(cfg->fs);
+		if (rv != 0)
+			had_errors = (void *) 1;
+
+	}
+	pthread_mutex_unlock(&cfg->mutex);
+
+	pthread_exit(had_errors);
+	return NULL;
+}
+
+/* Starts the autosync thread, which will perform a jsync() every max_sec
+ * seconds, or every max_bytes written using lingering transactions. */
+int jfs_autosync_start(struct jfs *fs, time_t max_sec, size_t max_bytes)
+{
+	struct autosync_cfg *cfg;
+
+	if (fs->as_cfg != NULL)
+		return -1;
+
+	cfg = malloc(sizeof(struct autosync_cfg));
+	if (cfg == NULL)
+		return -1;
+
+	cfg->fs = fs;
+	cfg->max_sec = max_sec;
+	cfg->max_bytes = max_bytes;
+	cfg->must_die = 0;
+	pthread_cond_init(&cfg->cond, NULL);
+	pthread_mutex_init(&cfg->mutex, NULL);
+
+	fs->as_cfg = cfg;
+
+	return pthread_create(&cfg->tid, NULL, &autosync_thread, cfg);
+}
+
+/* Stops the autosync thread started by jfs_autosync_start(). It's
+ * automatically called in jclose(). */
+int jfs_autosync_stop(struct jfs *fs)
+{
+	int rv = 0;
+	void *had_errors;
+
+	if (fs->as_cfg == NULL)
+		return 0;
+
+	fs->as_cfg->must_die = 1;
+	pthread_cond_signal(&fs->as_cfg->cond);
+	pthread_join(fs->as_cfg->tid, &had_errors);
+
+	if (had_errors)
+		rv = -1;
+
+	pthread_cond_destroy(&fs->as_cfg->cond);
+	pthread_mutex_destroy(&fs->as_cfg->mutex);
+	free(fs->as_cfg);
+	fs->as_cfg = NULL;
+
+	return rv;
+}
+
+/* Used internally to notify the autosync thread that the number of bytes has
+ * been exceeded. Must be called with fs' ltlock held. */
+void autosync_check(struct jfs *fs)
+{
+	if (fs->ltrans_len > fs->as_cfg->max_bytes)
+		pthread_cond_signal(&fs->as_cfg->cond);
+}
+
diff --git a/libjio/common.h b/libjio/common.h
index c946591..536c514 100644
--- a/libjio/common.h
+++ b/libjio/common.h
@@ -39,5 +39,7 @@ void get_jtfile(struct jfs *fs, unsigned int tid, char *jtfile);
 int checksum(int fd, size_t len, uint32_t *csum);
 uint32_t checksum_map(uint8_t *map, size_t count);
 
+void autosync_check(struct jfs *fs);
+
 #endif
 
diff --git a/libjio/journal.c b/libjio/journal.c
index fc9c243..3b63402 100644
--- a/libjio/journal.c
+++ b/libjio/journal.c
@@ -122,8 +122,7 @@ static int fsync_dir(int fd)
  */
 
 /* Creates a new transaction in the journal, returns a pointer to an opaque
- * jop_t (that is freed using journal_free), or NULL if there was an error.
- * The transaction cannot be modified until journal_free() is called. */
+ * jop_t (that is freed using journal_free), or NULL if there was an error. */
 struct journal_op *journal_new(struct jtrans *ts)
 {
 	int fd, id;
@@ -156,6 +155,7 @@ struct journal_op *journal_new(struct jtrans *ts)
 	jop->name = name;
 	jop->curpos = 0;
 	jop->ts = ts;
+	jop->fs = ts->fs;
 
 	fiu_exit_on("jio/commit/created_tf");
 
@@ -304,7 +304,7 @@ int journal_free(struct journal_op *jop)
 	unlink(jop->name);
 
 	fiu_exit_on("jio/commit/pre_ok_free_tid");
-	free_tid(jop->ts->fs, jop->ts->id);
+	free_tid(jop->fs, jop->id);
 
 	close(jop->fd);
 
diff --git a/libjio/journal.h b/libjio/journal.h
index fa1129c..5a4666c 100644
--- a/libjio/journal.h
+++ b/libjio/journal.h
@@ -11,6 +11,7 @@ struct journal_op {
 	char *name;
 	off_t curpos;
 	struct jtrans *ts;
+	struct jfs *fs;
 };
 
 typedef struct journal_op jop_t;
diff --git a/libjio/libjio.3 b/libjio/libjio.3
index dcdf8fc..b7ad911 100644
--- a/libjio/libjio.3
+++ b/libjio/libjio.3
@@ -48,6 +48,10 @@ libjio - A library for Journaled I/O
 
 .BI "int jsync(struct jfs *" fs ");"
 .BI "int jmove_journal(struct jfs *" fs ", const char *" newpath ");"
+.BI "int jfs_autosync_start(struct jfs *" fs ", time_t " max_sec ","
+.BI "           size_t " max_bytes ");"
+.BI "int jfs_autosync_stop(struct jfs *" fs ");"
+
 
 .BI "int jfsck(const char *" name ", const char *" jdir ","
 .BI "           struct jfsck_result *" res ");"
@@ -119,6 +123,19 @@ can be used to move the journal directory to a new location. It can be called
 only when nobody else is using the file. It is usually not used, except for
 very special cases.
 
+.B jfs_autosync_start()
+can be used to start a thread which will automatically perform a
+.B jsync()
+after the given number of seconds or the given number of bytes written using
+lingering transactions (whatever comes first). It's very useful when using
+lingering transactions.
+.B jfs_autosync_stop()
+stops the thread started by
+.BR jfs_autosync_start() .
+The thread is also stopped automatically when
+.B jclose()
+is called.
+
 There are two functions that differ from the rest, which are
 .BR jfsck() " and " jfsck_cleanup() .
 Both take as the first two parameters the path to the file to check and the
diff --git a/libjio/libjio.h b/libjio/libjio.h
index aeea19e..e922791 100644
--- a/libjio/libjio.h
+++ b/libjio/libjio.h
@@ -26,6 +26,7 @@
 /* empty declarations, the API does not expose these */
 struct jlinger;
 struct joper;
+struct autosync_cfg;
 
 /* the main file structure */
 struct jfs {
@@ -37,8 +38,10 @@ struct jfs {
 	unsigned int *jmap;	/* journal's lock file mmap area */
 	uint32_t flags;		/* journal flags */
 	struct jlinger *ltrans;	/* lingered transactions */
+	size_t ltrans_len;	/* length of all the lingered transactions */
 	pthread_mutex_t ltlock;	/* lingered transactions' lock */
 	pthread_mutex_t lock;	/* a soft lock used in some operations */
+	struct autosync_cfg *as_cfg; /* autosync config */
 };
 
 /* a transaction */
@@ -74,6 +77,10 @@ int jsync(struct jfs *fs);
 int jmove_journal(struct jfs *fs, const char *newpath);
 int jclose(struct jfs *fs);
 
+/* autosync */
+int jfs_autosync_start(struct jfs *fs, time_t max_sec, size_t max_bytes);
+int jfs_autosync_stop(struct jfs *fs);
+
 
 /* journal checker */
 int jfsck(const char *name, const char *jdir, struct jfsck_result *res);
diff --git a/libjio/trans.c b/libjio/trans.c
index e4a6e98..34bb33c 100644
--- a/libjio/trans.c
+++ b/libjio/trans.c
@@ -217,6 +217,8 @@ ssize_t jtrans_commit(struct jtrans *ts)
 		pthread_mutex_lock(&(ts->fs->ltlock));
 		linger->next = ts->fs->ltrans;
 		ts->fs->ltrans = linger;
+		ts->fs->ltrans_len += written;
+		autosync_check(ts->fs);
 		pthread_mutex_unlock(&(ts->fs->ltlock));
 	} else {
 		if (have_sync_range) {
@@ -390,6 +392,7 @@ int jopen(struct jfs *fs, const char *name, int flags, int mode, int jflags)
 	fs->jdir = NULL;
 	fs->jdirfd = -1;
 	fs->jmap = MAP_FAILED;
+	fs->as_cfg = NULL;
 
 	/* we provide either read-only or read-write access, because when we
 	 * commit a transaction we read the current contents before applying,
@@ -407,6 +410,7 @@ int jopen(struct jfs *fs, const char *name, int flags, int mode, int jflags)
 	fs->name = strdup(name);
 	fs->flags = jflags;
 	fs->ltrans = NULL;
+	fs->ltrans_len = 0;
 
 	/* Note on fs->lock usage: this lock is used only to protect the file
 	 * pointer. This means that it must only be held while performing
@@ -500,12 +504,11 @@ int jsync(struct jfs *fs)
 	if (fs->fd < 0)
 		return -1;
 
-	rv = fsync(fs->fd);
+	rv = fdatasync(fs->fd);
 	if (rv != 0)
 		return rv;
 
 	pthread_mutex_lock(&(fs->ltlock));
-	ltmp = fs->ltrans;
 	while (fs->ltrans != NULL) {
 		fiu_exit_on("jio/jsync/pre_unlink");
 		journal_free(fs->ltrans->jop);
@@ -515,6 +518,7 @@ int jsync(struct jfs *fs)
 		fs->ltrans = ltmp;
 	}
 
+	fs->ltrans_len = 0;
 	pthread_mutex_unlock(&(fs->ltlock));
 	return 0;
 }
@@ -588,6 +592,9 @@ int jclose(struct jfs *fs)
 
 	ret = 0;
 
+	if (jfs_autosync_stop(fs))
+		ret = -1;
+
 	if (! (fs->flags & J_RDONLY)) {
 		if (jsync(fs))
 			ret = -1;