00001
00002
00003
00004
00005
00006 #include <sys/types.h>
00007 #include <sys/stat.h>
00008 #include <fcntl.h>
00009 #include <unistd.h>
00010 #include <stdlib.h>
00011 #include <limits.h>
00012 #include <string.h>
00013 #include <libgen.h>
00014 #include <stdio.h>
00015 #include <dirent.h>
00016 #include <errno.h>
00017 #include <sys/mman.h>
00018
00019 #include "libjio.h"
00020 #include "common.h"
00021 #include "compat.h"
00022 #include "journal.h"
00023 #include "trans.h"
00024
00025
00026
00027
00028
00029
00030
00031 struct jtrans *jtrans_new(struct jfs *fs, unsigned int flags)
00032 {
00033 pthread_mutexattr_t attr;
00034 struct jtrans *ts;
00035
00036 ts = malloc(sizeof(struct jtrans));
00037 if (ts == NULL)
00038 return NULL;
00039
00040 ts->fs = fs;
00041 ts->id = 0;
00042 ts->flags = fs->flags | flags;
00043 ts->op = NULL;
00044 ts->numops_r = 0;
00045 ts->numops_w = 0;
00046 ts->len_w = 0;
00047
00048 pthread_mutexattr_init(&attr);
00049 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
00050 pthread_mutex_init(&(ts->lock), &attr);
00051 pthread_mutexattr_destroy(&attr);
00052
00053 return ts;
00054 }
00055
00056
00057 void jtrans_free(struct jtrans *ts)
00058 {
00059 struct operation *tmpop;
00060
00061 ts->fs = NULL;
00062
00063 while (ts->op != NULL) {
00064 tmpop = ts->op->next;
00065
00066 if (ts->op->buf && ts->op->direction == D_WRITE)
00067 free(ts->op->buf);
00068 if (ts->op->pdata)
00069 free(ts->op->pdata);
00070 free(ts->op);
00071
00072 ts->op = tmpop;
00073 }
00074 pthread_mutex_destroy(&(ts->lock));
00075
00076 free(ts);
00077 }
00078
00081 static int lock_file_ranges(struct jtrans *ts, int mode)
00082 {
00083 off_t lr;
00084 struct operation *op;
00085
00086 if (ts->flags & J_NOLOCK)
00087 return 0;
00088
00089 for (op = ts->op; op != NULL; op = op->next) {
00090 if (mode == F_LOCKW) {
00091 lr = plockf(ts->fs->fd, F_LOCKW, op->offset, op->len);
00092 if (lr == -1)
00093 goto error;
00094 op->locked = 1;
00095 } else if (mode == F_UNLOCK && op->locked) {
00096 lr = plockf(ts->fs->fd, F_UNLOCK, op->offset,
00097 op->len);
00098 if (lr == -1)
00099 goto error;
00100 op->locked = 0;
00101 }
00102 }
00103
00104 return 0;
00105
00106 error:
00107 return -1;
00108 }
00109
00112 static int operation_read_prev(struct jtrans *ts, struct operation *op)
00113 {
00114 ssize_t rv;
00115
00116 op->pdata = malloc(op->len);
00117 if (op->pdata == NULL)
00118 return -1;
00119
00120 rv = spread(ts->fs->fd, op->pdata, op->len,
00121 op->offset);
00122 if (rv < 0) {
00123 free(op->pdata);
00124 op->pdata = NULL;
00125 return -1;
00126 }
00127
00128 op->plen = op->len;
00129 if (rv < op->len) {
00130
00131
00132 op->plen = rv;
00133 }
00134
00135 return 0;
00136 }
00137
00139 static int jtrans_add_common(struct jtrans *ts, const void *buf, size_t count,
00140 off_t offset, enum op_direction direction)
00141 {
00142 struct operation *op, *tmpop;
00143
00144 op = tmpop = NULL;
00145
00146 pthread_mutex_lock(&(ts->lock));
00147
00148
00149 if ((ts->flags & J_RDONLY) && direction == D_WRITE)
00150 goto error;
00151
00152 if (count == 0)
00153 goto error;
00154
00155 if ((long long) ts->len_w + count > MAX_TSIZE)
00156 goto error;
00157
00158 op = malloc(sizeof(struct operation));
00159 if (op == NULL)
00160 goto error;
00161
00162 if (direction == D_WRITE) {
00163 op->buf = malloc(count);
00164 if (op->buf == NULL)
00165 goto error;
00166
00167 ts->numops_w++;
00168 } else {
00169 ts->numops_r++;
00170 }
00171
00172
00173 op->next = NULL;
00174 if (ts->op == NULL) {
00175 ts->op = op;
00176 op->prev = NULL;
00177 } else {
00178 for (tmpop = ts->op; tmpop->next != NULL; tmpop = tmpop->next)
00179 ;
00180 tmpop->next = op;
00181 op->prev = tmpop;
00182 }
00183
00184 pthread_mutex_unlock(&(ts->lock));
00185
00186 op->len = count;
00187 op->offset = offset;
00188 op->plen = 0;
00189 op->pdata = NULL;
00190 op->locked = 0;
00191 op->direction = direction;
00192
00193 if (direction == D_WRITE) {
00194 memcpy(op->buf, buf, count);
00195
00196 if (!(ts->flags & J_NOROLLBACK)) {
00197
00198
00199 posix_fadvise(ts->fs->fd, offset, count,
00200 POSIX_FADV_WILLNEED);
00201 }
00202 } else {
00203
00204
00205
00206 op->buf = (void *) buf;
00207
00208
00209
00210
00211
00212 posix_fadvise(ts->fs->fd, offset, count, POSIX_FADV_WILLNEED);
00213 }
00214
00215 return 0;
00216
00217 error:
00218 pthread_mutex_unlock(&(ts->lock));
00219
00220 if (op && direction == D_WRITE)
00221 free(op->buf);
00222 free(op);
00223
00224 return -1;
00225 }
00226
00227 int jtrans_add_r(struct jtrans *ts, void *buf, size_t count, off_t offset)
00228 {
00229 return jtrans_add_common(ts, buf, count, offset, D_READ);
00230 }
00231
00232 int jtrans_add_w(struct jtrans *ts, const void *buf, size_t count,
00233 off_t offset)
00234 {
00235 return jtrans_add_common(ts, buf, count, offset, D_WRITE);
00236 }
00237
00238
00239
00240 ssize_t jtrans_commit(struct jtrans *ts)
00241 {
00242 ssize_t r, retval = -1;
00243 struct operation *op;
00244 struct jlinger *linger;
00245 jop_t *jop = NULL;
00246 size_t written = 0;
00247
00248 pthread_mutex_lock(&(ts->lock));
00249
00250
00251 ts->flags = ts->flags & ~J_COMMITTED;
00252 ts->flags = ts->flags & ~J_ROLLBACKED;
00253
00254 if (ts->numops_r + ts->numops_w == 0)
00255 goto exit;
00256
00257
00258 if (ts->numops_w && (ts->flags & J_RDONLY))
00259 goto exit;
00260
00261
00262
00263
00264
00265
00266
00267
00268 if (lock_file_ranges(ts, F_LOCKW) != 0)
00269 goto unlock_exit;
00270
00271
00272
00273 if (ts->numops_w) {
00274 jop = journal_new(ts->fs, ts->flags);
00275 if (jop == NULL)
00276 goto unlock_exit;
00277 }
00278
00279 for (op = ts->op; op != NULL; op = op->next) {
00280 if (op->direction == D_READ)
00281 continue;
00282
00283 r = journal_add_op(jop, op->buf, op->len, op->offset);
00284 if (r != 0)
00285 goto unlink_exit;
00286
00287 fiu_exit_on("jio/commit/tf_opdata");
00288 }
00289
00290 if (jop)
00291 journal_pre_commit(jop);
00292
00293 fiu_exit_on("jio/commit/tf_data");
00294
00295 if (!(ts->flags & J_NOROLLBACK)) {
00296 for (op = ts->op; op != NULL; op = op->next) {
00297 if (op->direction == D_READ)
00298 continue;
00299
00300 r = operation_read_prev(ts, op);
00301 if (r < 0)
00302 goto unlink_exit;
00303 }
00304 }
00305
00306 if (jop) {
00307 r = journal_commit(jop);
00308 if (r < 0)
00309 goto unlink_exit;
00310 }
00311
00312
00313 written = 0;
00314 for (op = ts->op; op != NULL; op = op->next) {
00315 if (op->direction == D_READ) {
00316 r = spread(ts->fs->fd, op->buf, op->len, op->offset);
00317 if (r != op->len)
00318 goto rollback_exit;
00319
00320 continue;
00321 }
00322
00323
00324
00325 r = spwrite(ts->fs->fd, op->buf, op->len, op->offset);
00326 if (r != op->len)
00327 goto rollback_exit;
00328
00329 written += r;
00330
00331 if (have_sync_range && !(ts->flags & J_LINGER)) {
00332 r = sync_range_submit(ts->fs->fd, op->len,
00333 op->offset);
00334 if (r != 0)
00335 goto rollback_exit;
00336 }
00337
00338 fiu_exit_on("jio/commit/wrote_op");
00339 }
00340
00341 fiu_exit_on("jio/commit/wrote_all_ops");
00342
00343 if (jop && (ts->flags & J_LINGER)) {
00344 struct jlinger *lp;
00345
00346 linger = malloc(sizeof(struct jlinger));
00347 if (linger == NULL)
00348 goto rollback_exit;
00349
00350 linger->jop = jop;
00351 linger->next = NULL;
00352
00353 pthread_mutex_lock(&(ts->fs->ltlock));
00354
00355
00356 if (ts->fs->ltrans == NULL) {
00357 ts->fs->ltrans = linger;
00358 } else {
00359 lp = ts->fs->ltrans;
00360 while (lp->next != NULL)
00361 lp = lp->next;
00362 lp->next = linger;
00363 }
00364
00365 ts->fs->ltrans_len += written;
00366 autosync_check(ts->fs);
00367
00368 pthread_mutex_unlock(&(ts->fs->ltlock));
00369
00370
00371 jop = NULL;
00372 } else if (jop) {
00373 if (have_sync_range) {
00374 for (op = ts->op; op != NULL; op = op->next) {
00375 if (op->direction == D_READ)
00376 continue;
00377
00378 r = sync_range_wait(ts->fs->fd, op->len,
00379 op->offset);
00380 if (r != 0)
00381 goto rollback_exit;
00382 }
00383 } else {
00384 if (fdatasync(ts->fs->fd) != 0)
00385 goto rollback_exit;
00386 }
00387 }
00388
00389
00390 ts->flags = ts->flags | J_COMMITTED;
00391
00392 retval = 1;
00393
00394 rollback_exit:
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404 if (jop && !(ts->flags & J_COMMITTED) &&
00405 !(ts->flags & J_ROLLBACKING)) {
00406 r = ts->flags;
00407 ts->flags = ts->flags | J_NOLOCK | J_ROLLBACKING;
00408 if (jtrans_rollback(ts) >= 0) {
00409 ts->flags = r | J_ROLLBACKED;
00410 retval = -1;
00411 } else {
00412 ts->flags = r;
00413 retval = -2;
00414 }
00415 }
00416
00417 unlink_exit:
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 if (jop) {
00428
00429
00430 int data_is_safe = (ts->flags & J_COMMITTED) ||
00431 (ts->flags & J_ROLLBACKED);
00432 r = journal_free(jop, data_is_safe ? 1 : 0);
00433 if (r != 0)
00434 retval = -2;
00435
00436 jop = NULL;
00437 }
00438
00439 unlock_exit:
00440
00441
00442
00443 lock_file_ranges(ts, F_UNLOCK);
00444
00445 exit:
00446 pthread_mutex_unlock(&(ts->lock));
00447
00448 return retval;
00449 }
00450
00451
00452 ssize_t jtrans_rollback(struct jtrans *ts)
00453 {
00454 ssize_t rv;
00455 struct jtrans *newts;
00456 struct operation *op, *curop, *lop;
00457
00458 newts = jtrans_new(ts->fs, 0);
00459 if (newts == NULL)
00460 return -1;
00461
00462 newts->flags = ts->flags;
00463 newts->numops_r = 0;
00464 newts->numops_w = 0;
00465 newts->len_w = 0;
00466
00467 if (ts->op == NULL || ts->flags & J_NOROLLBACK) {
00468 rv = -1;
00469 goto exit;
00470 }
00471
00472
00473 for (op = ts->op; op->next != NULL; op = op->next)
00474 ;
00475
00476
00477 for ( ; op != NULL; op = op->prev) {
00478 if (op->direction == D_READ)
00479 continue;
00480
00481
00482
00483
00484
00485
00486
00487
00488 if (op->plen < op->len) {
00489 rv = ftruncate(ts->fs->fd, op->offset + op->plen);
00490 if (rv != 0)
00491 goto exit;
00492 }
00493
00494
00495 curop = malloc(sizeof(struct operation));
00496 if (curop == NULL) {
00497 rv = -1;
00498 goto exit;
00499 }
00500
00501 curop->offset = op->offset;
00502 curop->len = op->plen;
00503 curop->buf = op->pdata;
00504 curop->plen = op->plen;
00505 curop->pdata = op->pdata;
00506 curop->direction = op->direction;
00507 curop->locked = 0;
00508
00509 newts->numops_w++;
00510 newts->len_w += curop->len;
00511
00512
00513 if (newts->op == NULL) {
00514 newts->op = curop;
00515 curop->prev = NULL;
00516 curop->next = NULL;
00517 } else {
00518 for (lop = newts->op; lop->next != NULL; lop = lop->next)
00519 ;
00520 lop->next = curop;
00521 curop->prev = lop;
00522 curop->next = NULL;
00523 }
00524 }
00525
00526 rv = jtrans_commit(newts);
00527
00528 exit:
00529
00530 for (curop = newts->op; curop != NULL; curop = curop->next) {
00531 curop->buf = NULL;
00532 curop->pdata = NULL;
00533 }
00534 jtrans_free(newts);
00535
00536 return rv;
00537 }
00538
00539
00540
00541
00542
00543
00544
00545 struct jfs *jopen(const char *name, int flags, int mode, unsigned int jflags)
00546 {
00547 int jfd, rv;
00548 unsigned int t;
00549 char jdir[PATH_MAX], jlockfile[PATH_MAX];
00550 struct stat sinfo;
00551 pthread_mutexattr_t attr;
00552 struct jfs *fs;
00553
00554 fs = malloc(sizeof(struct jfs));
00555 if (fs == NULL)
00556 return NULL;
00557
00558 fs->fd = -1;
00559 fs->jfd = -1;
00560 fs->jdir = NULL;
00561 fs->jdirfd = -1;
00562 fs->jmap = MAP_FAILED;
00563 fs->as_cfg = NULL;
00564
00565
00566
00567
00568
00569
00570 if ((flags & O_WRONLY) || (flags & O_RDWR)) {
00571 flags = flags & ~O_WRONLY;
00572 flags = flags & ~O_RDONLY;
00573 flags = flags | O_RDWR;
00574 } else {
00575 jflags = jflags | J_RDONLY;
00576 }
00577
00578 fs->name = strdup(name);
00579 fs->flags = jflags;
00580 fs->open_flags = flags;
00581 fs->ltrans = NULL;
00582 fs->ltrans_len = 0;
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597 pthread_mutexattr_init(&attr);
00598 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
00599 pthread_mutex_init( &(fs->lock), &attr);
00600 pthread_mutex_init( &(fs->ltlock), &attr);
00601 pthread_mutexattr_destroy(&attr);
00602
00603 fs->fd = open(name, flags, mode);
00604 if (fs->fd < 0)
00605 goto error_exit;
00606
00607
00608 if (jflags & J_RDONLY) {
00609 return fs;
00610 }
00611
00612 if (!get_jdir(name, jdir))
00613 goto error_exit;
00614 rv = mkdir(jdir, 0750);
00615 rv = lstat(jdir, &sinfo);
00616 if (rv < 0 || !S_ISDIR(sinfo.st_mode))
00617 goto error_exit;
00618
00619 fs->jdir = (char *) malloc(strlen(jdir) + 1);
00620 if (fs->jdir == NULL)
00621 goto error_exit;
00622 strcpy(fs->jdir, jdir);
00623
00624
00625
00626 fs->jdirfd = open(jdir, O_RDONLY);
00627 if (fs->jdirfd < 0)
00628 goto error_exit;
00629
00630 snprintf(jlockfile, PATH_MAX, "%s/lock", jdir);
00631 jfd = open(jlockfile, O_RDWR | O_CREAT, 0600);
00632 if (jfd < 0)
00633 goto error_exit;
00634
00635 fs->jfd = jfd;
00636
00637
00638
00639
00640 plockf(jfd, F_LOCKW, 0, 0);
00641 lstat(jlockfile, &sinfo);
00642 if (sinfo.st_size != sizeof(unsigned int)) {
00643 t = 0;
00644 rv = spwrite(jfd, &t, sizeof(t), 0);
00645 if (rv != sizeof(t)) {
00646 goto error_exit;
00647 }
00648 }
00649 plockf(jfd, F_UNLOCK, 0, 0);
00650
00651 fs->jmap = (unsigned int *) mmap(NULL, sizeof(unsigned int),
00652 PROT_READ | PROT_WRITE, MAP_SHARED, jfd, 0);
00653 if (fs->jmap == MAP_FAILED)
00654 goto error_exit;
00655
00656 return fs;
00657
00658 error_exit:
00659
00660
00661
00662 jclose(fs);
00663 return NULL;
00664 }
00665
00666
00667 int jsync(struct jfs *fs)
00668 {
00669 int rv;
00670 struct jlinger *ltmp;
00671
00672 if (fs->fd < 0)
00673 return -1;
00674
00675 rv = fdatasync(fs->fd);
00676 if (rv != 0)
00677 return rv;
00678
00679
00680
00681
00682 pthread_mutex_lock(&(fs->ltlock));
00683 while (fs->ltrans != NULL) {
00684 fiu_exit_on("jio/jsync/pre_unlink");
00685 if (journal_free(fs->ltrans->jop, 1) != 0) {
00686 pthread_mutex_unlock(&(fs->ltlock));
00687 return -1;
00688 }
00689
00690 ltmp = fs->ltrans->next;
00691 free(fs->ltrans);
00692 fs->ltrans = ltmp;
00693 }
00694
00695 fs->ltrans_len = 0;
00696 pthread_mutex_unlock(&(fs->ltlock));
00697 return 0;
00698 }
00699
00700
00701 int jmove_journal(struct jfs *fs, const char *newpath)
00702 {
00703 int ret;
00704 char *oldpath, jlockfile[PATH_MAX], oldjlockfile[PATH_MAX];
00705
00706
00707
00708
00709
00710
00711 jsync(fs);
00712
00713 oldpath = fs->jdir;
00714 snprintf(oldjlockfile, PATH_MAX, "%s/lock", fs->jdir);
00715
00716 fs->jdir = (char *) malloc(strlen(newpath) + 1);
00717 if (fs->jdir == NULL)
00718 return -1;
00719 strcpy(fs->jdir, newpath);
00720
00721 ret = rename(oldpath, newpath);
00722 if (ret == -1 && (errno == ENOTEMPTY || errno == EEXIST) ) {
00723
00724
00725
00726 close(fs->jdirfd);
00727 fs->jdirfd = open(newpath, O_RDONLY);
00728 if (fs->jdirfd < 0)
00729 goto exit;
00730
00731 snprintf(jlockfile, PATH_MAX, "%s/lock", newpath);
00732 ret = rename(oldjlockfile, jlockfile);
00733 if (ret < 0)
00734 goto exit;
00735
00736
00737 unlink(oldjlockfile);
00738 ret = rmdir(oldpath);
00739 if (ret == -1) {
00740
00741
00742 goto exit;
00743 }
00744
00745 ret = 0;
00746 }
00747
00748 exit:
00749 free(oldpath);
00750 return ret;
00751 }
00752
00753
00754 int jclose(struct jfs *fs)
00755 {
00756 int ret;
00757
00758 ret = 0;
00759
00760 if (jfs_autosync_stop(fs))
00761 ret = -1;
00762
00763 if (! (fs->flags & J_RDONLY)) {
00764 if (jsync(fs))
00765 ret = -1;
00766 if (fs->jfd < 0 || close(fs->jfd))
00767 ret = -1;
00768 if (fs->jdirfd < 0 || close(fs->jdirfd))
00769 ret = -1;
00770 if (fs->jmap != MAP_FAILED)
00771 munmap(fs->jmap, sizeof(unsigned int));
00772 }
00773
00774 if (fs->fd < 0 || close(fs->fd))
00775 ret = -1;
00776 if (fs->name)
00777
00778 free(fs->name);
00779 if (fs->jdir)
00780 free(fs->jdir);
00781
00782 pthread_mutex_destroy(&(fs->lock));
00783
00784 free(fs);
00785
00786 return ret;
00787 }
00788