author | Alberto Bertogli
<albertito@blitiri.com.ar> 2009-04-11 03:38:38 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2009-04-12 13:51:17 UTC |
parent | e35082e8f739c065afc88907389c01a4d699d6b7 |
UPGRADING | +2 | -0 |
bindings/python2/libjio.c | +53 | -0 |
bindings/python3/libjio.c | +53 | -0 |
libjio/Makefile | +3 | -2 |
libjio/autosync.c | +124 | -0 |
libjio/common.h | +2 | -0 |
libjio/journal.c | +3 | -3 |
libjio/journal.h | +1 | -0 |
libjio/libjio.3 | +17 | -0 |
libjio/libjio.h | +7 | -0 |
libjio/trans.c | +9 | -2 |
diff --git a/UPGRADING b/UPGRADING index e5a4201..966e096 100644 --- a/UPGRADING +++ b/UPGRADING @@ -6,6 +6,8 @@ You should always clean all your files before upgrading. While I don't expect the transaction on-disk format to change, it's a good practise and it doesn't take much effort. When it's mandatory, it will be noted. +-> 0.26 + - Applications need to be recompiled due to a change in the jfs structure. -> 0.25 - It is no longer necessary to pass O_SYNC to jopen() if lingering diff --git a/bindings/python2/libjio.c b/bindings/python2/libjio.c index a29f8f1..85ce8d1 100644 --- a/bindings/python2/libjio.c +++ b/bindings/python2/libjio.c @@ -319,6 +319,55 @@ static PyObject *jf_jmove_journal(jfile_object *fp, PyObject *args) return PyLong_FromLong(rv); } +/* jfs_autosync_start() */ +PyDoc_STRVAR(jf_autosync_start__doc, +"autosync_start(max_sec, max_bytes)\n\ +\n\ +Starts the automatic sync thread (only useful when using lingering\n\ +transactions).\n"); + +static PyObject *jf_autosync_start(jfile_object *fp, PyObject *args) +{ + int rv; + unsigned int max_sec, max_bytes; + + if (!PyArg_ParseTuple(args, "II:autosync_start", &max_sec, + &max_bytes)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + rv = jfs_autosync_start(fp->fs, max_sec, max_bytes); + Py_END_ALLOW_THREADS + + if (rv != 0) + return PyErr_SetFromErrno(PyExc_IOError); + + return PyLong_FromLong(rv); +} + +/* jfs_autosync_stop() */ +PyDoc_STRVAR(jf_autosync_stop__doc, +"autosync_stop()\n\ +\n\ +Stops the automatic sync thread started by autosync_start()\n"); + +static PyObject *jf_autosync_stop(jfile_object *fp, PyObject *args) +{ + int rv; + + if (!PyArg_ParseTuple(args, ":autosync_stop")) + return NULL; + + Py_BEGIN_ALLOW_THREADS + rv = jfs_autosync_stop(fp->fs); + Py_END_ALLOW_THREADS + + if (rv != 0) + return PyErr_SetFromErrno(PyExc_IOError); + + return PyLong_FromLong(rv); +} + /* new_trans */ PyDoc_STRVAR(jf_new_trans__doc, "new_trans()\n\ @@ -365,6 +414,10 @@ static PyMethodDef jfile_methods[] = { { "jsync", (PyCFunction) jf_jsync, METH_VARARGS, jf_jsync__doc }, { "jmove_journal", (PyCFunction) jf_jmove_journal, METH_VARARGS, jf_jmove_journal__doc }, + { "autosync_start", (PyCFunction) jf_autosync_start, METH_VARARGS, + jf_autosync_start__doc }, + { "autosync_stop", (PyCFunction) jf_autosync_stop, METH_VARARGS, + jf_autosync_stop__doc }, { "new_trans", (PyCFunction) jf_new_trans, METH_VARARGS, jf_new_trans__doc }, { NULL } diff --git a/bindings/python3/libjio.c b/bindings/python3/libjio.c index 7d63691..79e93ce 100644 --- a/bindings/python3/libjio.c +++ b/bindings/python3/libjio.c @@ -320,6 +320,55 @@ static PyObject *jf_jmove_journal(jfile_object *fp, PyObject *args) return PyLong_FromLong(rv); } +/* jfs_autosync_start() */ +PyDoc_STRVAR(jf_autosync_start__doc, +"autosync_start(max_sec, max_bytes)\n\ +\n\ +Starts the automatic sync thread (only useful when using lingering\n\ +transactions).\n"); + +static PyObject *jf_autosync_start(jfile_object *fp, PyObject *args) +{ + int rv; + unsigned int max_sec, max_bytes; + + if (!PyArg_ParseTuple(args, "II:autosync_start", &max_sec, + &max_bytes)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + rv = jfs_autosync_start(fp->fs, max_sec, max_bytes); + Py_END_ALLOW_THREADS + + if (rv != 0) + return PyErr_SetFromErrno(PyExc_IOError); + + return PyLong_FromLong(rv); +} + +/* jfs_autosync_stop() */ +PyDoc_STRVAR(jf_autosync_stop__doc, +"autosync_stop()\n\ +\n\ +Stops the automatic sync thread started by autosync_start()\n"); + +static PyObject *jf_autosync_stop(jfile_object *fp, PyObject *args) +{ + int rv; + + if (!PyArg_ParseTuple(args, ":autosync_stop")) + return NULL; + + Py_BEGIN_ALLOW_THREADS + rv = jfs_autosync_stop(fp->fs); + Py_END_ALLOW_THREADS + + if (rv != 0) + return PyErr_SetFromErrno(PyExc_IOError); + + return PyLong_FromLong(rv); +} + /* new_trans */ PyDoc_STRVAR(jf_new_trans__doc, "new_trans()\n\ @@ -365,6 +414,10 @@ static PyMethodDef jfile_methods[] = { { "jsync", (PyCFunction) jf_jsync, METH_VARARGS, jf_jsync__doc }, { "jmove_journal", (PyCFunction) jf_jmove_journal, METH_VARARGS, jf_jmove_journal__doc }, + { "autosync_start", (PyCFunction) jf_autosync_start, METH_VARARGS, + jf_autosync_start__doc }, + { "autosync_stop", (PyCFunction) jf_autosync_stop, METH_VARARGS, + jf_autosync_stop__doc }, { "new_trans", (PyCFunction) jf_new_trans, METH_VARARGS, jf_new_trans__doc }, { NULL } diff --git a/libjio/Makefile b/libjio/Makefile index 72e4e18..6ccb28f 100644 --- a/libjio/Makefile +++ b/libjio/Makefile @@ -10,7 +10,7 @@ MANDATORY_LDFLAGS := $(shell getconf LFS_LIBS 2>/dev/null) ALL_CFLAGS += $(CFLAGS) $(MANDATORY_CFLAGS) -fPIC ALL_LDFLAGS += $(LDFLAGS) $(MANDATORY_LDFLAGS) -fPIC -LIBS = -lpthread +LIBS = -lpthread -lrt ifdef DEBUG ALL_CFLAGS += -g @@ -39,7 +39,8 @@ endif # objects to build -OBJS = checksum.o common.o compat.o trans.o check.o journal.o unix.o ansi.o +OBJS = autosync.o checksum.o common.o compat.o trans.o check.o journal.o \ + unix.o ansi.o # rules default: all diff --git a/libjio/autosync.c b/libjio/autosync.c new file mode 100644 index 0000000..2ba0fe3 --- /dev/null +++ b/libjio/autosync.c @@ -0,0 +1,124 @@ + +#include <pthread.h> /* pthread_* */ +#include <errno.h> /* ETIMEDOUT */ +#include <signal.h> /* sig_atomic_t */ +#include <stdlib.h> /* malloc() and friends */ +#include <time.h> /* clock_gettime() */ + +#include "common.h" +#include "libjio.h" + +struct autosync_cfg { + struct jfs *fs; + pthread_t tid; + + time_t max_sec; + size_t max_bytes; + + /* When the thread must die, we set this to 1 */ + sig_atomic_t must_die; + + /* Condition variable to wake up the thread */ + pthread_cond_t cond; + pthread_mutex_t mutex; +}; + +/* Thread that performes the automatic syncing */ +static void *autosync_thread(void *arg) +{ + int rv; + void *had_errors; + struct timespec ts; + struct autosync_cfg *cfg; + + cfg = (struct autosync_cfg *) arg; + + /* had_errors is a void * just to avoid weird casts, since we want to + * return it, but it's used as a boolean */ + had_errors = (void *) 0; + + pthread_mutex_lock(&cfg->mutex); + for (;;) { + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += cfg->max_sec; + + rv = pthread_cond_timedwait(&cfg->cond, &cfg->mutex, &ts); + if (rv != 0 && rv != ETIMEDOUT) + break; + + if (cfg->must_die) + break; + + /* cover from spurious wakeups */ + if (rv != ETIMEDOUT && cfg->fs->ltrans_len < cfg->max_bytes) + continue; + + rv = jsync(cfg->fs); + if (rv != 0) + had_errors = (void *) 1; + + } + pthread_mutex_unlock(&cfg->mutex); + + pthread_exit(had_errors); + return NULL; +} + +/* Starts the autosync thread, which will perform a jsync() every max_sec + * seconds, or every max_bytes written using lingering transactions. */ +int jfs_autosync_start(struct jfs *fs, time_t max_sec, size_t max_bytes) +{ + struct autosync_cfg *cfg; + + if (fs->as_cfg != NULL) + return -1; + + cfg = malloc(sizeof(struct autosync_cfg)); + if (cfg == NULL) + return -1; + + cfg->fs = fs; + cfg->max_sec = max_sec; + cfg->max_bytes = max_bytes; + cfg->must_die = 0; + pthread_cond_init(&cfg->cond, NULL); + pthread_mutex_init(&cfg->mutex, NULL); + + fs->as_cfg = cfg; + + return pthread_create(&cfg->tid, NULL, &autosync_thread, cfg); +} + +/* Stops the autosync thread started by jfs_autosync_start(). It's + * automatically called in jclose(). */ +int jfs_autosync_stop(struct jfs *fs) +{ + int rv = 0; + void *had_errors; + + if (fs->as_cfg == NULL) + return 0; + + fs->as_cfg->must_die = 1; + pthread_cond_signal(&fs->as_cfg->cond); + pthread_join(fs->as_cfg->tid, &had_errors); + + if (had_errors) + rv = -1; + + pthread_cond_destroy(&fs->as_cfg->cond); + pthread_mutex_destroy(&fs->as_cfg->mutex); + free(fs->as_cfg); + fs->as_cfg = NULL; + + return rv; +} + +/* Used internally to notify the autosync thread that the number of bytes has + * been exceeded. Must be called with fs' ltlock held. */ +void autosync_check(struct jfs *fs) +{ + if (fs->ltrans_len > fs->as_cfg->max_bytes) + pthread_cond_signal(&fs->as_cfg->cond); +} + diff --git a/libjio/common.h b/libjio/common.h index c946591..536c514 100644 --- a/libjio/common.h +++ b/libjio/common.h @@ -39,5 +39,7 @@ void get_jtfile(struct jfs *fs, unsigned int tid, char *jtfile); int checksum(int fd, size_t len, uint32_t *csum); uint32_t checksum_map(uint8_t *map, size_t count); +void autosync_check(struct jfs *fs); + #endif diff --git a/libjio/journal.c b/libjio/journal.c index fc9c243..3b63402 100644 --- a/libjio/journal.c +++ b/libjio/journal.c @@ -122,8 +122,7 @@ static int fsync_dir(int fd) */ /* Creates a new transaction in the journal, returns a pointer to an opaque - * jop_t (that is freed using journal_free), or NULL if there was an error. - * The transaction cannot be modified until journal_free() is called. */ + * jop_t (that is freed using journal_free), or NULL if there was an error. */ struct journal_op *journal_new(struct jtrans *ts) { int fd, id; @@ -156,6 +155,7 @@ struct journal_op *journal_new(struct jtrans *ts) jop->name = name; jop->curpos = 0; jop->ts = ts; + jop->fs = ts->fs; fiu_exit_on("jio/commit/created_tf"); @@ -304,7 +304,7 @@ int journal_free(struct journal_op *jop) unlink(jop->name); fiu_exit_on("jio/commit/pre_ok_free_tid"); - free_tid(jop->ts->fs, jop->ts->id); + free_tid(jop->fs, jop->id); close(jop->fd); diff --git a/libjio/journal.h b/libjio/journal.h index fa1129c..5a4666c 100644 --- a/libjio/journal.h +++ b/libjio/journal.h @@ -11,6 +11,7 @@ struct journal_op { char *name; off_t curpos; struct jtrans *ts; + struct jfs *fs; }; typedef struct journal_op jop_t; diff --git a/libjio/libjio.3 b/libjio/libjio.3 index dcdf8fc..b7ad911 100644 --- a/libjio/libjio.3 +++ b/libjio/libjio.3 @@ -48,6 +48,10 @@ libjio - A library for Journaled I/O .BI "int jsync(struct jfs *" fs ");" .BI "int jmove_journal(struct jfs *" fs ", const char *" newpath ");" +.BI "int jfs_autosync_start(struct jfs *" fs ", time_t " max_sec "," +.BI " size_t " max_bytes ");" +.BI "int jfs_autosync_stop(struct jfs *" fs ");" + .BI "int jfsck(const char *" name ", const char *" jdir "," .BI " struct jfsck_result *" res ");" @@ -119,6 +123,19 @@ can be used to move the journal directory to a new location. It can be called only when nobody else is using the file. It is usually not used, except for very special cases. +.B jfs_autosync_start() +can be used to start a thread which will automatically perform a +.B jsync() +after the given number of seconds or the given number of bytes written using +lingering transactions (whatever comes first). It's very useful when using +lingering transactions. +.B jfs_autosync_stop() +stops the thread started by +.BR jfs_autosync_start() . +The thread is also stopped automatically when +.B jclose() +is called. + There are two functions that differ from the rest, which are .BR jfsck() " and " jfsck_cleanup() . Both take as the first two parameters the path to the file to check and the diff --git a/libjio/libjio.h b/libjio/libjio.h index aeea19e..e922791 100644 --- a/libjio/libjio.h +++ b/libjio/libjio.h @@ -26,6 +26,7 @@ /* empty declarations, the API does not expose these */ struct jlinger; struct joper; +struct autosync_cfg; /* the main file structure */ struct jfs { @@ -37,8 +38,10 @@ struct jfs { unsigned int *jmap; /* journal's lock file mmap area */ uint32_t flags; /* journal flags */ struct jlinger *ltrans; /* lingered transactions */ + size_t ltrans_len; /* length of all the lingered transactions */ pthread_mutex_t ltlock; /* lingered transactions' lock */ pthread_mutex_t lock; /* a soft lock used in some operations */ + struct autosync_cfg *as_cfg; /* autosync config */ }; /* a transaction */ @@ -74,6 +77,10 @@ int jsync(struct jfs *fs); int jmove_journal(struct jfs *fs, const char *newpath); int jclose(struct jfs *fs); +/* autosync */ +int jfs_autosync_start(struct jfs *fs, time_t max_sec, size_t max_bytes); +int jfs_autosync_stop(struct jfs *fs); + /* journal checker */ int jfsck(const char *name, const char *jdir, struct jfsck_result *res); diff --git a/libjio/trans.c b/libjio/trans.c index e4a6e98..34bb33c 100644 --- a/libjio/trans.c +++ b/libjio/trans.c @@ -217,6 +217,8 @@ ssize_t jtrans_commit(struct jtrans *ts) pthread_mutex_lock(&(ts->fs->ltlock)); linger->next = ts->fs->ltrans; ts->fs->ltrans = linger; + ts->fs->ltrans_len += written; + autosync_check(ts->fs); pthread_mutex_unlock(&(ts->fs->ltlock)); } else { if (have_sync_range) { @@ -390,6 +392,7 @@ int jopen(struct jfs *fs, const char *name, int flags, int mode, int jflags) fs->jdir = NULL; fs->jdirfd = -1; fs->jmap = MAP_FAILED; + fs->as_cfg = NULL; /* we provide either read-only or read-write access, because when we * commit a transaction we read the current contents before applying, @@ -407,6 +410,7 @@ int jopen(struct jfs *fs, const char *name, int flags, int mode, int jflags) fs->name = strdup(name); fs->flags = jflags; fs->ltrans = NULL; + fs->ltrans_len = 0; /* Note on fs->lock usage: this lock is used only to protect the file * pointer. This means that it must only be held while performing @@ -500,12 +504,11 @@ int jsync(struct jfs *fs) if (fs->fd < 0) return -1; - rv = fsync(fs->fd); + rv = fdatasync(fs->fd); if (rv != 0) return rv; pthread_mutex_lock(&(fs->ltlock)); - ltmp = fs->ltrans; while (fs->ltrans != NULL) { fiu_exit_on("jio/jsync/pre_unlink"); journal_free(fs->ltrans->jop); @@ -515,6 +518,7 @@ int jsync(struct jfs *fs) fs->ltrans = ltmp; } + fs->ltrans_len = 0; pthread_mutex_unlock(&(fs->ltlock)); return 0; } @@ -588,6 +592,9 @@ int jclose(struct jfs *fs) ret = 0; + if (jfs_autosync_stop(fs)) + ret = -1; + if (! (fs->flags & J_RDONLY)) { if (jsync(fs)) ret = -1;