author | Alberto Bertogli
<albertito@blitiri.com.ar> 2009-09-25 00:12:11 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2009-09-25 00:12:11 UTC |
parent | 0b8fe71c5d789d9484905a300afaafe1af25b4a3 |
tests/stress/jiostress | +226 | -30 |
diff --git a/tests/stress/jiostress b/tests/stress/jiostress index b186312..b9a2a4a 100755 --- a/tests/stress/jiostress +++ b/tests/stress/jiostress @@ -9,7 +9,10 @@ failures. import sys import os +import time +import select import random +import fcntl import traceback from optparse import OptionParser import libjio @@ -86,6 +89,124 @@ def pread(fd, start, end): assert c == end - start return r +# +# Output handler, used to get a nice output when using multiple processes +# + +class OutputHandler: + def __init__(self, every): + # fds to read from + self.rs = [] + + # we will report every this number of seconds + self.every = every + + # how many transactions has each child processed; we use the + # read end of the pipe to identify them + self.ntrans = {} + + # like self.ntrans but counts only the failed ones + self.nfailures = {} + + # fd to write to, only relevant in the child + self.w = None + + # p = parent, c = child + self.end = 'p' + + # last transaction number print + self.last_print = 0 + + # time of the last print + self.last_print_time = 0 + + def prefork(self): + r, w = os.pipe() + self.rs.append(r) + self.ntrans[r] = 0 + self.nfailures[r] = 0 + self.w = w + + def child(self): + self.end = 'c' + os.close(self.rs[-1]) + self.rs = [] + + def parent(self): + os.close(self.w) + self.w = None + + SUCCESS = bytes('1', encoding = 'ascii') + FAILURE = bytes('0', encoding = 'ascii') + + def feed(self, success = True): + if success: + os.write(self.w, OutputHandler.SUCCESS) + else: + os.write(self.w, OutputHandler.FAILURE) + + def output_loop(self): + while self.rs: + rr, rw, rx = select.select(self.rs, [], [], 1) + for r in rr: + d = os.read(r, 1) + if not d: + self.rs.remove(r) + else: + self.ntrans[r] += 1 + if d == OutputHandler.FAILURE: + self.nfailures[r] += 1 + + self.cond_print() + self.print() + return sum(self.ntrans.values()), sum(self.nfailures.values()) + + def cond_print(self): + if time.time() - self.last_print_time >= self.every: + self.print() + + def print(self): + self.last_print_time = time.time() + for r in sorted(self.ntrans): + print("%4d" % self.ntrans[r], end = ' ') + print() + + +# +# Lock manager, used to lock ranges between multiple processes +# +# We can't lock the real file because that would ruin libjio's locking, so we +# create a new file, remove it, and use fcntl locking. Not very elegant but it +# does the trick. +# + +class VoidLockManager: + def __init__(self): + pass + + def lock(self, start, end): + pass + + def unlock(self, start, end): + pass + +class LockManager: + def __init__(self): + fname = "/tmp/js-lock-tmp." + str(os.getpid()) + self.fd = open(fname, 'w+') + os.unlink(fname) + + def lock(self, start, end): + #print(os.getpid(), '\tlock:', start, end) + #sys.stdout.flush() + fcntl.lockf(self.fd, fcntl.LOCK_EX, end - start, start) + + def unlock(self, start, end): + #print(os.getpid(), '\tunlock:', start, end) + #sys.stdout.flush() + fcntl.lockf(self.fd, fcntl.LOCK_UN, end - start, start) + + # # A range of bytes inside a file, used inside the transactions # @@ -94,7 +215,7 @@ def pread(fd, start, end): # class Range: - def __init__(self, fsize, maxlen): + def __init__(self, fsize, maxlen, lockmgr): # public self.start, self.end = randfrange(fsize, maxlen) self.new_data = None @@ -104,12 +225,21 @@ class Range: self.prev_data = None self.new_data_ctx = None self.read_buf = None + self.lockmgr = lockmgr + self.locked = False # read an extended range so we can check we # only wrote what we were supposed to self.ext_start = max(0, self.start - 32) self.ext_end = min(fsize, self.end + 32) + def __lt__(self, other): + return self.ext_start < other.ext_start + + def __del__(self): + if self.locked: + self.lockmgr.unlock(self.ext_start, self.ext_end) + def overlaps(self, other): if (other.ext_start <= self.ext_start <= other.ext_end) or \ (other.ext_start <= self.ext_end <= other.ext_end) or \ @@ -121,6 +251,8 @@ class Range: def prepare_r(self): self.type = 'r' self.read_buf = bytearray(self.end - self.start) + self.lockmgr.lock(self.ext_start, self.ext_end) + self.locked = True def verify_r(self, fd): real_data = pread(fd, self.start, self.end) @@ -131,6 +263,9 @@ class Range: def prepare_w(self, fd): self.type = 'w' + self.lockmgr.lock(self.ext_start, self.ext_end) + self.locked = True + self.prev_data = pread(fd, self.ext_start, self.ext_end) self.new_data = getbytes(self.end - self.start) @@ -173,7 +308,7 @@ class Range: class T_base: "Interface for the transaction types" - def __init__(self, f, jf, fsize): + def __init__(self, f, jf, fsize, lockmgr, do_verify): pass def prepare(self): @@ -186,13 +321,14 @@ class T_base: pass class T_jwrite (T_base): - def __init__(self, f, jf, fsize): + def __init__(self, f, jf, fsize, lockmgr, do_verify): self.f = f self.jf = jf self.fsize = fsize + self.do_verify = do_verify self.maxoplen = min(int(fsize / 256), 16 * 1024 * 1024) - self.range = Range(self.fsize, self.maxoplen) + self.range = Range(self.fsize, self.maxoplen, lockmgr) def prepare(self): self.range.prepare_w(self.f) @@ -201,13 +337,16 @@ class T_jwrite (T_base): self.jf.pwrite(self.range.new_data, self.range.start) def verify(self, write_only = False): + if not self.do_verify: + return self.range.verify(self.f) class T_writeonly (T_base): - def __init__(self, f, jf, fsize): + def __init__(self, f, jf, fsize, lockmgr, do_verify): self.f = f self.jf = jf self.fsize = fsize + self.do_verify = do_verify # favour many small ops self.maxoplen = 1 * 1024 * 1024 @@ -217,7 +356,7 @@ class T_writeonly (T_base): c = 0 while len(self.ranges) < self.nops and c < self.nops * 1.25: - candidate = Range(self.fsize, self.maxoplen) + candidate = Range(self.fsize, self.maxoplen, lockmgr) safe = True for r in self.ranges: if candidate.overlaps(r): @@ -227,6 +366,10 @@ class T_writeonly (T_base): self.ranges.append(candidate) c += 1 + # sort the transactions so there's no risk of internal + # deadlocks via the lock manager + self.ranges.sort() + def prepare(self): for r in self.ranges: r.prepare_w(self.f) @@ -238,6 +381,9 @@ class T_writeonly (T_base): t.commit() def verify(self, write_only = False): + if not self.do_verify: + return + try: for r in self.ranges: r.verify(self.f) @@ -250,8 +396,8 @@ class T_writeonly (T_base): raise class T_readwrite (T_writeonly): - def __init__(self, f, jf, fsize): - T_writeonly.__init__(self, f, jf, fsize) + def __init__(self, f, jf, fsize, lockmgr, do_verify): + T_writeonly.__init__(self, f, jf, fsize, lockmgr, do_verify) self.read_ranges = [] def prepare(self): @@ -271,6 +417,9 @@ class T_readwrite (T_writeonly): t.commit() def verify(self, write_only = False): + if not self.do_verify: + return + try: for r in self.ranges: if write_only and r.type == 'r': @@ -292,12 +441,16 @@ t_list = [T_jwrite, T_writeonly, T_readwrite] # class Stresser: - def __init__(self, fname, fsize, nops, use_fi, use_as): + def __init__(self, fname, fsize, nops, use_fi, use_as, output, + lockmgr, do_verify): self.fname = fname self.fsize = fsize self.nops = nops self.use_fi = use_fi self.use_as = use_as + self.output = output + self.lockmgr = lockmgr + self.do_verify = do_verify jflags = 0 if use_as: @@ -395,31 +548,25 @@ class Stresser: def run(self): nfailures = 0 - sys.stdout.write(" ") for i in range(1, self.nops + 1): - sys.stdout.write(".") - if i % 10 == 0: - sys.stdout.write(" ") - if i % 50 == 0: - sys.stdout.write(" %d\n" % i) - sys.stdout.write(" ") - sys.stdout.flush() - trans = random.choice(t_list)(self.f, self.jf, - self.fsize) + self.fsize, self.lockmgr, + self.do_verify) if self.use_fi: r = self.apply_fork(trans) else: r = self.apply(trans) - if not r: + + if r: + self.output.feed(success = True) + else: + self.output.feed(success = False) nfailures += 1 r = self.reopen(trans) trans.verify(write_only = True) - sys.stdout.write("\n") - sys.stdout.flush() return nfailures @@ -427,6 +574,41 @@ class Stresser: # Main # +def run_stressers(nproc, fname, fsize, nops, use_fi, use_as, output, lockmgr, + do_verify): + pids = [] + print("Launching stress test") + for i in range(nproc): + # Calculate how many operations will this child perform. The + # last one will work a little more so we get exactly nops. + # Note we prefer to work extra in the end rather than having + # the last process with 0 child_nops, that's why we use int() + # instead of round() or ceil(). + child_nops = int(nops / nproc) + if i == nproc - 1: + child_nops = nops - int(nops / nproc) * i + + output.prefork() + sys.stdout.flush() + pid = os.fork() + if pid == 0: + # child + output.child() + s = Stresser(fname, fsize, child_nops, use_fi, use_as, + output, lockmgr, do_verify) + s.run() + sys.exit(0) + else: + output.parent() + pids.append(pid) + + print("Launched stress tests") + totalops, nfailures = output.output_loop() + print("Stress test completed, waiting for children") + for pid in pids: + os.waitpid(pid, 0) + print(" %d operations" % totalops) + print(" %d simulated failures" % nfailures) def main(): usage = "Use: %prog [options] <file name> <file size in Mb>" @@ -440,6 +622,16 @@ def main(): parser.add_option("-a", "--as", dest = "use_as", action = "store_true", default = False, help = "use J_LINGER + autosync (conflicts with --fi)") + parser.add_option("-p", "--nproc", dest = "nproc", type = "int", + default = 1, + help = "number of processes (defaults to %default)") + parser.add_option("", "--no-internal-lock", + dest = "use_internal_locks", action = "store_false", + default = True, + help = "do not lock internally, disables verification") + parser.add_option("", "--no-verify", dest = "do_verify", + action = "store_false", default = True, + help = "do not perform verifications") options, args = parser.parse_args() @@ -458,14 +650,18 @@ def main(): print("Error: --fi and --as cannot be used together") sys.exit(1) - s = Stresser(fname, fsize, options.nops, options.use_fi, - options.use_as) - print("Running stress test") - nfailures = s.run() - del s - print("Stress test completed") - print(" %d operations" % options.nops) - print(" %d simulated failures" % nfailures) + if not options.use_internal_locks: + options.do_verify = False + + output = OutputHandler(every = 2) + if options.use_internal_locks: + lockmgr = LockManager() + else: + lockmgr = VoidLockManager() + + run_stressers(options.nproc, fname, fsize, options.nops, + options.use_fi, options.use_as, output, lockmgr, + options.do_verify) r = jfsck(fname) print("Final check completed")