author | Alberto Bertogli
<albertito@blitiri.com.ar> 2009-08-04 16:45:57 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2009-08-04 17:45:52 UTC |
parent | a97577e4f5f6df3860e3b73b5ce6da378fd77c5a |
UPGRADING | +3 | -0 |
bindings/python/libjio.c | +101 | -8 |
doc/guide.rst | +8 | -5 |
libjio/journal.c | +7 | -6 |
libjio/libjio.3 | +31 | -18 |
libjio/libjio.h | +43 | -11 |
libjio/trans.c | +130 | -69 |
libjio/trans.h | +38 | -11 |
libjio/unix.c | +12 | -18 |
samples/full.c | +2 | -2 |
samples/jio3.c | +4 | -4 |
tests/behaviour/t_fi.py | +10 | -10 |
tests/behaviour/t_normal.py | +49 | -18 |
diff --git a/UPGRADING b/UPGRADING index 884e8f8..0db68b4 100644 --- a/UPGRADING +++ b/UPGRADING @@ -14,6 +14,9 @@ take much effort. When it's mandatory, it will be noted. - jopen() jflags parameter is now unsigned. - J_NOCLEANUP was removed in favour of J_CLEANUP, and the default behaviour of jfsck() is now not to clean up unless J_CLEANUP is passed in the flags. + - jtrans_add() renamed to jtrans_add_w(). + - jtrans_commit() returns 0 on success, instead of the amount of bytes + written. -> 0.50 (Big API change) - Structures are now opaque types: diff --git a/bindings/python/libjio.c b/bindings/python/libjio.c index 4e04366..46b257d 100644 --- a/bindings/python/libjio.c +++ b/bindings/python/libjio.c @@ -50,6 +50,10 @@ typedef struct { PyObject_HEAD jtrans_t *ts; jfile_object *jfile; + + /* add_r() allocates views which must be freed by the destructor */ + Py_buffer **views; + size_t nviews; } jtrans_object; static PyTypeObject jtrans_type; @@ -562,6 +566,9 @@ static PyObject *jf_new_trans(jfile_object *fp, PyObject *args) tp->jfile = fp; Py_INCREF(fp); + tp->views = NULL; + tp->nviews = 0; + return (PyObject *) tp; } @@ -629,34 +636,119 @@ static void jt_dealloc(jtrans_object *tp) jtrans_free(tp->ts); } Py_DECREF(tp->jfile); + + /* release views allocated by add_r */ + while (tp->nviews) { + PyBuffer_Release(tp->views[tp->nviews - 1]); + free(tp->views[tp->nviews - 1]); + tp->nviews--; + } + free(tp->views); + PyObject_Del(tp); } -/* add */ -PyDoc_STRVAR(jt_add__doc, -"add(buf, offset)\n\ +/* add_w */ +PyDoc_STRVAR(jt_add_w__doc, +"add_w(buf, offset)\n\ \n\ Add an operation to write the given buffer at the given offset to the\n\ transaction.\n\ -It's a wrapper to jtrans_add().\n"); +It's a wrapper to jtrans_add_w().\n"); -static PyObject *jt_add(jtrans_object *tp, PyObject *args) +static PyObject *jt_add_w(jtrans_object *tp, PyObject *args) { long rv; int len; long long offset; unsigned char *buf; - if (!PyArg_ParseTuple(args, "s#L:add", &buf, &len, &offset)) + if (!PyArg_ParseTuple(args, "s#L:add_w", &buf, &len, &offset)) return NULL; - rv = jtrans_add(tp->ts, buf, len, offset); + rv = jtrans_add_w(tp->ts, buf, len, offset); if (rv < 0) return PyErr_SetFromErrno(PyExc_IOError); return PyLong_FromLong(rv); } +/* add_r */ +PyDoc_STRVAR(jt_add_r__doc, +"add_r(buf, offset)\n\ +\n\ +Add an operation to read into the given buffer at the given offset to the\n\ +transaction.\n\ +It's a wrapper to jtrans_add_r().\n\ +\n\ +The buffer must be objects that support slice assignment, like bytearray\n\ +(but *not* str).\n\ +Only available in Python >= 2.6.\n"); + +/* add_r requires the new Py_buffer interface, which is only available in + * Python >= 2.6 */ +#if PY_MAJOR_VERSION >= 3 || (PY_MAJOR_VERSION == 2 && PY_MINOR_VERSION >= 6) +static PyObject *jt_add_r(jtrans_object *tp, PyObject *args) +{ + long rv; + PyObject *py_buf; + unsigned long long offset; + Py_buffer *view = NULL, **new_views; + + if (!PyArg_ParseTuple(args, "OL:add_r", &py_buf, &offset)) + return NULL; + + if (!PyObject_CheckBuffer(py_buf)) { + PyErr_SetString(PyExc_TypeError, + "object must support the buffer interface"); + return NULL; + } + + view = malloc(sizeof(Py_buffer)); + if (view == NULL) + return PyErr_NoMemory(); + + if (PyObject_GetBuffer(py_buf, view, PyBUF_CONTIG)) { + free(view); + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + rv = jtrans_add_r(tp->ts, view->buf, view->len, offset); + Py_END_ALLOW_THREADS + + if (rv < 0) { + PyBuffer_Release(view); + free(view); + return PyErr_SetFromErrno(PyExc_IOError); + } + + new_views = realloc(tp->views, sizeof(Py_buffer *) * tp->nviews + 1); + if (new_views == NULL) { + PyBuffer_Release(view); + free(view); + return PyErr_NoMemory(); + } + + tp->nviews++; + tp->views = new_views; + tp->views[tp->nviews - 1] = view; + + return PyLong_FromLong(rv); +} + +#else + +static PyObject *jt_add_r(jtrans_object *tp, PyObject *args) +{ + PyErr_SetString(PyExc_NotImplementedError, + "only supported in Python >= 2.6"); + return NULL; +} + +#endif /* python version >= 2.6 */ + + /* commit */ PyDoc_STRVAR(jt_commit__doc, "commit()\n\ @@ -707,7 +799,8 @@ static PyObject *jt_rollback(jtrans_object *tp, PyObject *args) /* method table */ static PyMethodDef jtrans_methods[] = { - { "add", (PyCFunction) jt_add, METH_VARARGS, jt_add__doc }, + { "add_r", (PyCFunction) jt_add_r, METH_VARARGS, jt_add_r__doc }, + { "add_w", (PyCFunction) jt_add_w, METH_VARARGS, jt_add_w__doc }, { "commit", (PyCFunction) jt_commit, METH_VARARGS, jt_commit__doc }, { "rollback", (PyCFunction) jt_rollback, METH_VARARGS, jt_rollback__doc }, { NULL } diff --git a/doc/guide.rst b/doc/guide.rst index 7d5f5fb..df9e939 100644 --- a/doc/guide.rst +++ b/doc/guide.rst @@ -75,9 +75,9 @@ Now that you have opened a file, the next thing to do would be to create a transaction. This is what *jtrans_new()* is for: it takes a file structure and returns a new transaction structure. -To add an operation to the transaction, use *jtrans_add()*. You can add as -many operations as you want. Operations within a transaction may overlap, and -will be applied in order. +To add a write operation to the transaction, use *jtrans_add_w()*. You can add +as many operations as you want. Operations within a transaction may overlap, +and will be applied in order. Finally, to apply our transaction to the file, use *jtrans_commit()*. @@ -93,7 +93,7 @@ are ignored for simplicity):: file = jopen("filename", O_RDWR | O_CREAT, 0600, 0); trans = jtrans_new(file, 0); - jtrans_add(trans, buf, strlen(buf), 0); + jtrans_add_w(trans, buf, strlen(buf), 0); jtrans_commit(trans); jtrans_free(trans); @@ -101,7 +101,7 @@ are ignored for simplicity):: As we've seen, you open the file and initialize the structure with *jopen()* (with the parameter *jflags* being the last 0), create a new transaction with -*jtrans_new()*, then add an operation with *jtrans_add()* (the last 0 is the +*jtrans_new()*, then add an operation with *jtrans_add_w()* (the last 0 is the offset, in this case the beginning of the file), commit the transaction with *jtrans_commit()*, free it with *jtrans_free()*, and finally close the file with *jclose()*. @@ -111,6 +111,9 @@ Reading is much easier: the library provides three functions, *jread()*, *readv()*, except that they play safe with libjio's writing code. You should use these to read from files when using libjio. +You can also add read operations to a transaction using *jtrans_add_r()*, and +the data will be read atomically at commit time. + Integrity checking and recovery ------------------------------- diff --git a/libjio/journal.c b/libjio/journal.c index 570c8f0..7c8d9bf 100644 --- a/libjio/journal.c +++ b/libjio/journal.c @@ -491,7 +491,6 @@ exit: int fill_trans(unsigned char *map, off_t len, struct jtrans *ts) { int rv; - unsigned int numops; unsigned char *p; struct operation *op, *tmp; struct on_disk_hdr hdr; @@ -514,8 +513,10 @@ int fill_trans(unsigned char *map, off_t len, struct jtrans *ts) ts->id = hdr.trans_id; ts->flags = hdr.flags; + ts->numops_r = 0; + ts->numops_w = 0; + ts->len_w = 0; - numops = 0; for (;;) { if (p + sizeof(ophdr) > map + len) goto error; @@ -539,6 +540,7 @@ int fill_trans(unsigned char *map, off_t len, struct jtrans *ts) op->len = ophdr.len; op->offset = ophdr.offset; + op->direction = D_WRITE; op->buf = (void *) p; p += op->len; @@ -557,7 +559,8 @@ int fill_trans(unsigned char *map, off_t len, struct jtrans *ts) op->next = NULL; } - numops++; + ts->numops_w++; + ts->len_w += op->len; } if (p + sizeof(trailer) > map + len) @@ -568,11 +571,9 @@ int fill_trans(unsigned char *map, off_t len, struct jtrans *ts) trailer_ntoh(&trailer); - if (trailer.numops != numops) + if (trailer.numops != ts->numops_w) goto error; - ts->numops = numops; - if (checksum_buf(0, map, len - sizeof(trailer)) != trailer.checksum) { rv = -2; goto error; diff --git a/libjio/libjio.3 b/libjio/libjio.3 index e0aa316..6e946fb 100644 --- a/libjio/libjio.3 +++ b/libjio/libjio.3 @@ -23,7 +23,9 @@ libjio - A library for Journaled I/O .BI "jtrans_t *jtrans_new(jfs_t *" fs ", unsigned int " flags ");" .BI "int jtrans_commit(jtrans_t *" ts ");" -.BI "int jtrans_add(jtrans_t *" ts ", const void *" buf "," +.BI "int jtrans_add_r(jtrans_t *" ts ", void *" buf "," +.BI " size_t " count ", off_t " offset ");" +.BI "int jtrans_add_w(jtrans_t *" ts ", const void *" buf "," .BI " size_t " count ", off_t " offset ");" .BI "int jtrans_rollback(jtrans_t *" ts ");" .BI "void jtrans_free(jtrans_t *" ts ");" @@ -79,8 +81,8 @@ interfaces to and friends. The basic functions consist of -.BR jtrans_new() ", " jtrans_add() ", " jtrans_commit() " and " -.BR jtrans_rollback() . +.BR jtrans_new() ", " jtrans_add_r() ", " jtrans_add_w() ", " +.BR jtrans_commit() " and " jtrans_rollback() . They provide a lower-level method for manipulating transactions. .SS TYPES AND STRUCTURES @@ -174,9 +176,8 @@ semantics and behave the same way. .SS BASIC FUNCTIONS The basic functions are the ones which manipulate transactions directly: -.BR jtrans_new() ", " jtrans_add() ", " jtrans_commit() ", " jtrans_rollback() -and -.BR jtrans_free() . +.BR jtrans_new() ", " jtrans_add_r() ", " jtrans_add_w() ", " +.BR jtrans_commit() ", " jtrans_rollback() " and " jtrans_free()" . These are intended to be use when your application requires direct control over the transactions. @@ -190,22 +191,34 @@ is not a disk operation, but only frees the pointers that were previously allocated by the library; all disk operations are performed by the other two functions. -.B jtrans_add() -is used to add operations to a transaction, and it takes the same parameters -as +You can add multiple read and write operations to a transaction, and they will +be applied in order. + +.B jtrans_add_w() +is used to add write operations to a transaction, and it takes the same +parameters as .BR pwrite() : a buffer, its length and the offset where it should be applied, and adds it to -the transaction. You can add multiple operations to a transaction, and they -will be applied in order. +the transaction. The buffer is copied internally and can be free()d right +after this function returns. + +.B jtrans_add_r() +is used to add read operations to a transaction, and it takes the same +parameters as +.BR pread() : +a buffer, its length and the offset where it should read from, and adds it to +the transaction. Note that if there is not enough data in the file to read +the specified amount of bytes, the commit will fail, so do not attempt to read +beyond EOF (you can use jread() for that purpose). .B jtrans_commit() -commits the given transaction to disk. After it has returned, data has been -saved to the disk. The commit operation is atomic with regards to other read -or write operations on different processes, as long as they all access it via -libjio. It returns the number of bytes written, -1 if there was an error but -atomic warantees were preserved, or -2 if there was an error and there is a -possible break of atomic warantees (which is an indication of a severe -underlying condition). +commits the given transaction to disk. After it has returned, write operations +have been saved to the disk, and read operations have been read from it. The +commit operation is atomic with regards to other read or write operations on +different processes, as long as they all access it via libjio. It returns the +number 0 on success, -1 if there was an error but atomic warantees were +preserved, or -2 if there was an error and there is a possible break of atomic +warantees (which is an indication of a severe underlying condition). .B jtrans_rollback() reverses a transaction that was applied with diff --git a/libjio/libjio.h b/libjio/libjio.h index f1dc65d..0476c6e 100644 --- a/libjio/libjio.h +++ b/libjio/libjio.h @@ -148,10 +148,10 @@ int jsync(jfs_t *fs); */ jtrans_t *jtrans_new(jfs_t *fs, unsigned int flags); -/** Add an operation to a transaction. +/** Add a write operation to a transaction. * - * An operation consists of a buffer, its length, and the offset to write it - * to. + * A write operation consists of a buffer, its length, and the offset to write + * it to. * * The file will not be touched (not even locked) until commit time, where the * first count bytes of buf will be written at offset. @@ -159,6 +159,9 @@ jtrans_t *jtrans_new(jfs_t *fs, unsigned int flags); * Transactions will be applied in order, and overlapping operations are * permitted, in which case the latest one will prevail. * + * The buffer will be copied internally and can be free()d right after this + * function returns. + * * @param ts transaction * @param buf buffer to write * @param count how many bytes from the buffer to write @@ -166,22 +169,50 @@ jtrans_t *jtrans_new(jfs_t *fs, unsigned int flags); * @returns 0 on success, -1 on error * @ingroup basic */ -int jtrans_add(jtrans_t *ts, const void *buf, size_t count, off_t offset); +int jtrans_add_w(jtrans_t *ts, const void *buf, size_t count, off_t offset); + +/** Add a read operation to a transaction. + * + * An operation consists of a buffer, its length, and the offset to read it + * from. + * + * The file will not be touched (not even locked) until commit time, where the + * first count bytes at offset will be read into buf. + * + * Note that if there is not enough data in the file to read the specified + * amount of bytes, the commit will fail, so do not attempt to read beyond EOF + * (you can use jread() for that purpose). + * + * Transactions will be applied in order, and overlapping operations are + * permitted, in which case the latest one will prevail. + * + * In case of an error in jtrans_commit(), the contents of the buffer are + * undefined. + * + * @param ts transaction + * @param buf buffer to read to + * @param count how many bytes to read + * @param offset offset to read at + * @returns 0 on success, -1 on error + * @ingroup basic + * @see jread() + */ +int jtrans_add_r(jtrans_t *ts, void *buf, size_t count, off_t offset); /** Commit a transaction. * - * All the operations added to it using jtrans_add() will be written to disk, - * in the same order they were added. + * All the operations added to it using jtrans_add_w()/jtrans_add_r() will be + * written to/read from disk, in the same order they were added. * * After this function returns successfully, all the data can be trusted to be * on the disk. The commit is atomic with regards to other processes using * libjio, but not accessing directly to the file. * * @param ts transaction - * @returns the amount of bytes written to disk, or -1 if there was an error - * but atomic warranties were preserved, or -2 if there was an error and - * there is a possible break of atomic warranties (which is an indication - * of a severe underlying condition). + * @returns 0 on success, or -1 if there was an error but atomic warranties + * were preserved, or -2 if there was an error and there is a possible + * break of atomic warranties (which is an indication of a severe + * underlying condition). * @ingroup basic */ ssize_t jtrans_commit(jtrans_t *ts); @@ -189,7 +220,8 @@ ssize_t jtrans_commit(jtrans_t *ts); /** Rollback a transaction. * * This function atomically undoes a previous committed transaction. After its - * successful return, the data can be trusted to be on disk. + * successful return, the data can be trusted to be on disk. The read + * operations will be ignored. * * Use with care. * diff --git a/libjio/trans.c b/libjio/trans.c index f8aa491..91970d7 100644 --- a/libjio/trans.c +++ b/libjio/trans.c @@ -41,8 +41,9 @@ struct jtrans *jtrans_new(struct jfs *fs, unsigned int flags) ts->id = 0; ts->flags = fs->flags | flags; ts->op = NULL; - ts->numops = 0; - ts->len = 0; + ts->numops_r = 0; + ts->numops_w = 0; + ts->len_w = 0; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); @@ -62,7 +63,7 @@ void jtrans_free(struct jtrans *ts) while (ts->op != NULL) { tmpop = ts->op->next; - if (ts->op->buf) + if (ts->op->buf && ts->op->direction == D_WRITE) free(ts->op->buf); if (ts->op->pdata) free(ts->op->pdata); @@ -134,87 +135,107 @@ static int operation_read_prev(struct jtrans *ts, struct operation *op) return 0; } -/* Add an operation to a transaction */ -int jtrans_add(struct jtrans *ts, const void *buf, size_t count, off_t offset) +/** Common function to add an operation to a transaction */ +static int jtrans_add_common(struct jtrans *ts, const void *buf, size_t count, + off_t offset, enum op_direction direction) { struct operation *op, *tmpop; + op = tmpop = NULL; + pthread_mutex_lock(&(ts->lock)); - /* fail for read-only accesses */ - if (ts->flags & J_RDONLY) { - pthread_mutex_unlock(&(ts->lock)); - return -1; - } + /* Writes are not allowed in read-only mode, they fail early */ + if ((ts->flags & J_RDONLY) && direction == D_WRITE) + goto error; - /* fail for 0 length operations */ - if (count == 0) { - pthread_mutex_unlock(&(ts->lock)); - return -1; - } + if (count == 0) + goto error; - if ((long long) ts->len + count > MAX_TSIZE) { - pthread_mutex_unlock(&(ts->lock)); - return -1; + if ((long long) ts->len_w + count > MAX_TSIZE) + goto error; + + op = malloc(sizeof(struct operation)); + if (op == NULL) + goto error; + + if (direction == D_WRITE) { + op->buf = malloc(count); + if (op->buf == NULL) + goto error; + + ts->numops_w++; + } else { + ts->numops_r++; } - /* find the last operation in the transaction and create a new one at - * the end */ + /* add op to the end of the linked list */ + op->next = NULL; if (ts->op == NULL) { - ts->op = malloc(sizeof(struct operation)); - if (ts->op == NULL) { - pthread_mutex_unlock(&(ts->lock)); - return -1; - } - op = ts->op; + ts->op = op; op->prev = NULL; } else { for (tmpop = ts->op; tmpop->next != NULL; tmpop = tmpop->next) ; - tmpop->next = malloc(sizeof(struct operation)); - if (tmpop->next == NULL) { - pthread_mutex_unlock(&(ts->lock)); - return -1; - } - tmpop->next->prev = tmpop; - op = tmpop->next; + tmpop->next = op; + op->prev = tmpop; } - op->buf = malloc(count); - if (op->buf == NULL) { - /* remove from the list and fail */ - if (op->prev == NULL) { - ts->op = NULL; - } else { - op->prev->next = op->next; - } - free(op); - pthread_mutex_unlock(&(ts->lock)); - return -1; - } - - ts->numops++; - ts->len += count; - pthread_mutex_unlock(&(ts->lock)); - memcpy(op->buf, buf, count); op->len = count; op->offset = offset; - op->next = NULL; op->plen = 0; op->pdata = NULL; op->locked = 0; + op->direction = direction; - if (!(ts->flags & J_NOROLLBACK)) { - /* jtrans_commit() will want to read the current data, so we - * tell the kernel about that */ + if (direction == D_WRITE) { + memcpy(op->buf, buf, count); + + if (!(ts->flags & J_NOROLLBACK)) { + /* jtrans_commit() will want to read the current data, + * so we tell the kernel about that */ + posix_fadvise(ts->fs->fd, offset, count, + POSIX_FADV_WILLNEED); + } + } else { + /* this casts the const away, which is ugly but let us have a + * common read/write path and avoid useless code repetition + * just to handle it */ + op->buf = (void *) buf; + + /* if there are no overlapping writes, jtrans_commit() will + * want to read the data from the disk; and if there are we + * will already have submitted a request and one more won't + * hurt */ posix_fadvise(ts->fs->fd, offset, count, POSIX_FADV_WILLNEED); } return 0; + +error: + pthread_mutex_unlock(&(ts->lock)); + + if (op && direction == D_WRITE) + free(op->buf); + free(op); + + return -1; +} + +int jtrans_add_r(struct jtrans *ts, void *buf, size_t count, off_t offset) +{ + return jtrans_add_common(ts, buf, count, offset, D_READ); } +int jtrans_add_w(struct jtrans *ts, const void *buf, size_t count, + off_t offset) +{ + return jtrans_add_common(ts, buf, count, offset, D_WRITE); +} + + /* Commit a transaction */ ssize_t jtrans_commit(struct jtrans *ts) { @@ -230,16 +251,25 @@ ssize_t jtrans_commit(struct jtrans *ts) ts->flags = ts->flags & ~J_COMMITTED; ts->flags = ts->flags & ~J_ROLLBACKED; - /* fail for read-only accesses */ - if (ts->flags & J_RDONLY) + if (ts->numops_r + ts->numops_w == 0) goto exit; - /* create and fill the transaction file */ - jop = journal_new(ts->fs, ts->flags); - if (jop == NULL) + /* fail for read-only accesses if we have write operations */ + if (ts->numops_w && (ts->flags & J_RDONLY)) goto exit; + /* create and fill the transaction file only if we have at least one + * write operation */ + if (ts->numops_w) { + jop = journal_new(ts->fs, ts->flags); + if (jop == NULL) + goto exit; + } + for (op = ts->op; op != NULL; op = op->next) { + if (op->direction == D_READ) + continue; + r = journal_add_op(jop, op->buf, op->len, op->offset); if (r != 0) goto unlink_exit; @@ -247,7 +277,8 @@ ssize_t jtrans_commit(struct jtrans *ts) fiu_exit_on("jio/commit/tf_opdata"); } - journal_pre_commit(jop); + if (jop) + journal_pre_commit(jop); fiu_exit_on("jio/commit/tf_data"); @@ -260,19 +291,34 @@ ssize_t jtrans_commit(struct jtrans *ts) if (!(ts->flags & J_NOROLLBACK)) { for (op = ts->op; op != NULL; op = op->next) { + if (op->direction == D_READ) + continue; + r = operation_read_prev(ts, op); if (r < 0) goto unlink_exit; } } - r = journal_commit(jop); - if (r < 0) - goto unlink_exit; + if (jop) { + r = journal_commit(jop); + if (r < 0) + goto unlink_exit; + } /* now that we have a safe transaction file, let's apply it */ written = 0; for (op = ts->op; op != NULL; op = op->next) { + if (op->direction == D_READ) { + r = spread(ts->fs->fd, op->buf, op->len, op->offset); + if (r != op->len) + goto rollback_exit; + + continue; + } + + /* from now on, write ops (which are more interesting) */ + r = spwrite(ts->fs->fd, op->buf, op->len, op->offset); if (r != op->len) goto rollback_exit; @@ -291,7 +337,7 @@ ssize_t jtrans_commit(struct jtrans *ts) fiu_exit_on("jio/commit/wrote_all_ops"); - if (ts->flags & J_LINGER) { + if (jop && (ts->flags & J_LINGER)) { struct jlinger *lp; linger = malloc(sizeof(struct jlinger)); @@ -320,9 +366,12 @@ ssize_t jtrans_commit(struct jtrans *ts) /* Leave the journal_free() up to jsync() */ jop = NULL; - } else { + } else if (jop) { if (have_sync_range) { for (op = ts->op; op != NULL; op = op->next) { + if (op->direction == D_READ) + continue; + r = sync_range_wait(ts->fs->fd, op->len, op->offset); if (r != 0) @@ -336,10 +385,12 @@ ssize_t jtrans_commit(struct jtrans *ts) /* mark the transaction as committed */ ts->flags = ts->flags | J_COMMITTED; - retval = written; + + retval = 1; rollback_exit: /* If the transaction failed we try to recover by rolling it back. + * Only used if it has at least one write operation. * * NOTE: on extreme conditions (ENOSPC/disk failure) this can fail * too! There's nothing much we can do in that case, the caller should @@ -347,7 +398,8 @@ rollback_exit: * * Transactions that were successfuly recovered by rolling them back * will have J_ROLLBACKED in their flags. */ - if (!(ts->flags & J_COMMITTED) && !(ts->flags & J_ROLLBACKING)) { + if (jop && !(ts->flags & J_COMMITTED) && + !(ts->flags & J_ROLLBACKING)) { r = ts->flags; ts->flags = ts->flags | J_NOLOCK | J_ROLLBACKING; if (jtrans_rollback(ts) >= 0) { @@ -404,7 +456,9 @@ ssize_t jtrans_rollback(struct jtrans *ts) return -1; newts->flags = ts->flags; - newts->numops = ts->numops; + newts->numops_r = 0; + newts->numops_w = 0; + newts->len_w = 0; if (ts->op == NULL || ts->flags & J_NOROLLBACK) { rv = -1; @@ -415,8 +469,11 @@ ssize_t jtrans_rollback(struct jtrans *ts) for (op = ts->op; op->next != NULL; op = op->next) ; - /* and traverse the list backwards */ + /* and traverse the list backwards, skipping read operations */ for ( ; op != NULL; op = op->prev) { + if (op->direction == D_READ) + continue; + /* if we extended the data in the previous transaction, we * should truncate it back */ /* DANGEROUS: this is one of the main reasons why rollbacking @@ -442,8 +499,12 @@ ssize_t jtrans_rollback(struct jtrans *ts) curop->buf = op->pdata; curop->plen = op->plen; curop->pdata = op->pdata; + curop->direction = op->direction; curop->locked = 0; + newts->numops_w++; + newts->len_w += curop->len; + /* add the new transaction to the list */ if (newts->op == NULL) { newts->op = curop; diff --git a/libjio/trans.h b/libjio/trans.h index 33238bd..466c2d9 100644 --- a/libjio/trans.h +++ b/libjio/trans.h @@ -15,11 +15,14 @@ struct jtrans { /** Transaction flags */ uint32_t flags; - /** Number of operations in the list */ - unsigned int numops; + /** Number of read operations in the list */ + unsigned int numops_r; - /** Transaction's length */ - size_t len; + /** Number of write operations in the list */ + unsigned int numops_w; + + /** Sum of the lengths of the write operations */ + size_t len_w; /** Lock that protects the list of operations */ pthread_mutex_t lock; @@ -28,15 +31,39 @@ struct jtrans { struct operation *op; }; -/* a single operation */ +/** Possible operation directions */ +enum op_direction { + D_READ = 1, + D_WRITE = 2, +}; + +/** A single operation */ struct operation { - int locked; /* is the region is locked? */ - off_t offset; /* operation's offset */ - size_t len; /* data length */ - void *buf; /* data */ - size_t plen; /* previous data length */ - void *pdata; /* previous data */ + /** Is the region locked? */ + int locked; + + /** Operation's offset */ + off_t offset; + + /** Data length, in bytes */ + size_t len; + + /** Data buffer */ + void *buf; + + /** Direction */ + enum op_direction direction; + + /** Previous data length (only if direction == D_WRITE) */ + size_t plen; + + /** Previous data (only if direction == D_WRITE) */ + void *pdata; + + /** Previous operation */ struct operation *prev; + + /** Next operation */ struct operation *next; }; diff --git a/libjio/unix.c b/libjio/unix.c index 33aeadf..076dbb7 100644 --- a/libjio/unix.c +++ b/libjio/unix.c @@ -32,10 +32,8 @@ ssize_t jread(struct jfs *fs, void *buf, size_t count) rv = spread(fs->fd, buf, count, pos); plockf(fs->fd, F_UNLOCK, pos, count); - if (rv > 0) { - /* if success, advance the file pointer */ + if (rv > 0) lseek(fs->fd, rv, SEEK_CUR); - } pthread_mutex_unlock(&(fs->lock)); @@ -97,16 +95,14 @@ ssize_t jwrite(struct jfs *fs, const void *buf, size_t count) else pos = lseek(fs->fd, 0, SEEK_CUR); - rv = jtrans_add(ts, buf, count, pos); + rv = jtrans_add_w(ts, buf, count, pos); if (rv < 0) goto exit; rv = jtrans_commit(ts); - if (rv > 0) { - /* if success, advance the file pointer */ - lseek(fs->fd, rv, SEEK_CUR); - } + if (rv >= 0) + lseek(fs->fd, count, SEEK_CUR); exit: @@ -114,7 +110,7 @@ exit: jtrans_free(ts); - return rv; + return (rv >= 0) ? count : rv; } /* pwrite() wrapper */ @@ -127,7 +123,7 @@ ssize_t jpwrite(struct jfs *fs, const void *buf, size_t count, off_t offset) if (ts == NULL) return -1; - rv = jtrans_add(ts, buf, count, offset); + rv = jtrans_add_w(ts, buf, count, offset); if (rv < 0) goto exit; @@ -136,7 +132,7 @@ ssize_t jpwrite(struct jfs *fs, const void *buf, size_t count, off_t offset) exit: jtrans_free(ts); - return rv; + return (rv >= 0) ? count : rv; } /* writev() wrapper */ @@ -163,7 +159,8 @@ ssize_t jwritev(struct jfs *fs, const struct iovec *vector, int count) sum = 0; for (i = 0; i < count; i++) { - rv = jtrans_add(ts, vector[i].iov_base, vector[i].iov_len, t); + rv = jtrans_add_w(ts, vector[i].iov_base, + vector[i].iov_len, t); if (rv < 0) goto exit; @@ -173,18 +170,15 @@ ssize_t jwritev(struct jfs *fs, const struct iovec *vector, int count) rv = jtrans_commit(ts); - if (rv > 0) { - /* if success, advance the file pointer */ - lseek(fs->fd, rv, SEEK_CUR); - } + if (rv >= 0) + lseek(fs->fd, sum, SEEK_CUR); exit: pthread_mutex_unlock(&(fs->lock)); jtrans_free(ts); - return rv; - + return (rv >= 0) ? sum : rv; } /* Truncate a file. Be careful with this */ diff --git a/samples/full.c b/samples/full.c index 701a155..5da9af8 100644 --- a/samples/full.c +++ b/samples/full.c @@ -28,8 +28,8 @@ int main(void) /* write two "Hello world"s next to each other */ trans = jtrans_new(file, 0); - jtrans_add(trans, TEXT, strlen(TEXT), 0); - jtrans_add(trans, TEXT, strlen(TEXT), strlen(TEXT)); + jtrans_add_w(trans, TEXT, strlen(TEXT), 0); + jtrans_add_w(trans, TEXT, strlen(TEXT), strlen(TEXT)); r = jtrans_commit(trans); if (r < 0) { perror("jtrans_commit"); diff --git a/samples/jio3.c b/samples/jio3.c index 6731cd5..b5a6bd3 100644 --- a/samples/jio3.c +++ b/samples/jio3.c @@ -24,16 +24,16 @@ int main(int argc, char **argv) perror("jtrans_new()"); #define str1 "1ROLLBACKTEST1!\n" - jtrans_add(ts, str1, strlen(str1), 0); + jtrans_add_w(ts, str1, strlen(str1), 0); #define str2 "2ROLLBACKTEST2!\n" - jtrans_add(ts, str2, strlen(str2), strlen(str1)); + jtrans_add_w(ts, str2, strlen(str2), strlen(str1)); #define str3 "3ROLLBACKTEST3!\n" - jtrans_add(ts, str3, strlen(str3), strlen(str1) + strlen(str2)); + jtrans_add_w(ts, str3, strlen(str3), strlen(str1) + strlen(str2)); rv = jtrans_commit(ts); - if (rv != strlen(str1) + strlen(str2) + strlen(str3)) + if (rv < 0) perror("jtrans_commit()"); printf("commit ok: %d\n", rv); diff --git a/tests/behaviour/t_fi.py b/tests/behaviour/t_fi.py index 332b82f..419030d 100644 --- a/tests/behaviour/t_fi.py +++ b/tests/behaviour/t_fi.py @@ -72,8 +72,8 @@ def test_f04(): fiu.enable_external("jio/commit/tf_pre_addop", gen_ret_seq((0, 1))) t = jf.new_trans() - t.add(c, 0) - t.add(c, len(c) + 200) + t.add_w(c, 0) + t.add_w(c, len(c) + 200) t.commit() n = run_with_tmp(f1) @@ -92,8 +92,8 @@ def test_f05(): fiu.enable_external("jio/commit/tf_opdata", gen_ret_seq((0, 1))) t = jf.new_trans() - t.add(c, 0) - t.add(c, len(c) + 200) + t.add_w(c, 0) + t.add_w(c, len(c) + 200) t.commit() n = run_with_tmp(f1) @@ -111,8 +111,8 @@ def test_f06(): def f1(f, jf): fiu.enable("jio/commit/tf_data") t = jf.new_trans() - t.add(c, 0) - t.add(c, len(c) + 200) + t.add_w(c, 0) + t.add_w(c, len(c) + 200) t.commit() n = run_with_tmp(f1) @@ -144,8 +144,8 @@ def test_f08(): def f1(f, jf): fiu.enable("jio/commit/wrote_op") t = jf.new_trans() - t.add(c, 0) - t.add(c, len(c) + 200) + t.add_w(c, 0) + t.add_w(c, len(c) + 200) t.commit() n = run_with_tmp(f1) @@ -192,7 +192,7 @@ def test_f11(): def f1(f, jf): jf.write('x' * (80 + len(c))) t = jf.new_trans() - t.add(c, 80) + t.add_w(c, 80) t.commit() assert content(f.name) == 'x' * 80 + c fiu.enable("jio/commit/tf_sync") @@ -212,7 +212,7 @@ def test_f12(): def f1(f, jf): fiu.enable("jio/jsync/pre_unlink") t = jf.new_trans() - t.add(c, 0) + t.add_w(c, 0) t.commit() jf.jsync() diff --git a/tests/behaviour/t_normal.py b/tests/behaviour/t_normal.py index c5d608c..a9ba8df 100644 --- a/tests/behaviour/t_normal.py +++ b/tests/behaviour/t_normal.py @@ -22,7 +22,8 @@ def test_n02(): c = gencontent() def f1(f, jf): - jf.write(c) + jf.write(c[:len(c) / 2]) + jf.write(c[len(c) / 2:]) jf.lseek(0, 0) assert jf.read(len(c) * 2) == c @@ -59,7 +60,7 @@ def test_n05(): def f1(f, jf): t = jf.new_trans() - t.add(c, 80) + t.add_w(c, 80) t.commit() n = run_with_tmp(f1) @@ -73,7 +74,7 @@ def test_n06(): def f1(f, jf): t = jf.new_trans() - t.add(c, 80) + t.add_w(c, 80) t.commit() t.rollback() @@ -94,7 +95,7 @@ def test_n07(): def f1(f, jf): jf.write(c1) t = jf.new_trans() - t.add(c2, len(c1) - 973) + t.add_w(c2, len(c1) - 973) t.commit() t.rollback() @@ -115,10 +116,10 @@ def test_n08(): def f1(f, jf): jf.write(c1) t = jf.new_trans() - t.add(c2, len(c1) - 973) - t.add(c3, len(c1) - 1041) - t.add(c4, len(c1) - 666) - t.add(c5, len(c1) - 3000) + t.add_w(c2, len(c1) - 973) + t.add_w(c3, len(c1) - 1041) + t.add_w(c4, len(c1) - 666) + t.add_w(c5, len(c1) - 3000) t.commit() n = run_with_tmp(f1) @@ -137,10 +138,10 @@ def test_n09(): def f1(f, jf): jf.write(c1) t = jf.new_trans() - t.add(c2, len(c1) - 973) - t.add(c3, len(c1) - 1041) - t.add(c4, len(c1) - 666) - t.add(c5, len(c1) - 3000) + t.add_w(c2, len(c1) - 973) + t.add_w(c3, len(c1) - 1041) + t.add_w(c4, len(c1) - 666) + t.add_w(c5, len(c1) - 3000) t.commit() t.rollback() @@ -156,7 +157,7 @@ def test_n10(): def f1(f, jf): t = jf.new_trans() - t.add(c, 0) + t.add_w(c, 0) t.commit() del t assert content(f.name) == c @@ -234,7 +235,7 @@ def test_n15(): cleanup(n) def test_n16(): - "jopen r/o + jtrans_add + jtrans_commit" + "jopen r/o + jtrans_add_w + jtrans_commit" c = gencontent() # create the file before opening, read-only mode does not create it @@ -245,13 +246,14 @@ def test_n16(): t = jf.new_trans() try: - t.add(c, 80) + t.add_w(c, 80) except IOError: pass else: raise AssertionError try: + # note this fails because there are no ops to commit t.commit() except IOError: pass @@ -286,7 +288,7 @@ def test_n18(): n = f.name t = jf.new_trans() - t.add(c, 80) + t.add_w(c, 80) t.commit() try: t.rollback() @@ -314,14 +316,14 @@ def test_n19(): cleanup(n) def test_n20(): - "jtrans_add of 0 length" + "jtrans_add_w of 0 length" f, jf = bitmp() n = f.name t = jf.new_trans() try: - t.add('', 80) + t.add_w('', 80) except IOError: pass else: @@ -362,4 +364,33 @@ def test_n22(): fsck_verify(n) cleanup(n) +def test_n23(): + "jtrans_add_w + jtrans_add_r" + f, jf = bitmp() + n = f.name + + c1 = gencontent(1000) + c2 = gencontent(1000) + c3 = gencontent(1000) + + buf1 = bytearray(0 for i in range(30)) + buf2 = bytearray(0 for i in range(100)) + + t = jf.new_trans() + t.add_w(c1, 0) + t.add_r(buf1, 0) + t.add_w(c2, len(c2)) + t.add_r(buf2, len(c1) - len(buf2) / 2) + t.add_w(c3, len(c1) + len(c2)) + t.commit() + + assert content(n) == c1 + c2 + c3 + assert buf1 == c1[:len(buf1)] + assert buf2 == c1[-(len(buf2) / 2):] + c2[:len(buf2) / 2] + + del t + del jf + fsck_verify(n) + cleanup(n) +