#!/usr/bin/env python3 """ This application is a stress tester for libjio. It's not a traditional stress test like fsx (which can be used to test libjio using the preloading library), but uses fault injection to check how the library behaves under random failures. """ import sys import os import random import traceback import libjio try: import fiu except ImportError: print() print("Error: unable to load fiu module. This test needs libfiu") print("support. Please install libfiu and recompile libjio with FI=1.") print() raise # # Auxiliary stuff # randsrc = open('/dev/urandom', mode = 'rb') def randbytes(n): "Returns n random bytes" if n < 64: return randsrc.read(n) # to avoid reading too much from urandom (which needlessly stresses # the kernel), just get a small random stream and repeat it s = randsrc.read(64) while len(s) < n: s += s + s[::-1] return s[:n] def randfrange(maxend, maxsize): start = random.randint(0, maxend - 1) size = random.randint(0, (maxend - 1) - start) % maxsize return start, start + size class ConsistencyError (Exception): pass # # The test itself # class Stresser: def __init__(self, fname, fsize, nops): self.fname = fname self.fsize = fsize self.nops = nops self.maxoplen = min(int(self.fsize / 4), 5 * 1024 * 1024) self.jf = libjio.open(fname, libjio.O_RDWR | libjio.O_CREAT, 0o600) self.f = open(fname, mode = 'rb') # data used for consistency checks self.current_range = (0, 0) self.prev_data = b"" self.new_data = b"" def pread(self, start, end): ppos = self.f.tell() self.f.seek(start, 0) r = self.f.read(end - start) self.f.seek(ppos, 0) return r def randwrite(self): pid = os.fork() if pid == 0: # child try: start, end = randfrange(self.fsize, self.maxoplen) # read an extended range so we can check we # only wrote what we were supposed to estart = max(0, start - 32) eend = min(self.fsize, end + 32) self.current_range = (estart, eend) self.prev_data = self.pread(estart, eend) nd = randbytes(end - start) self.new_data = self.prev_data[estart:start] \ + nd + self.prev_data[end:eend] self.jf.pwrite(nd, start) except IOError: sys.exit(1) except: traceback.print_exc() sys.exit(1) sys.exit(0) else: # parent id, status = os.waitpid(pid, 0) if not os.WIFEXITED(status): raise RuntimeError(status) if os.WEXITSTATUS(status) != 0: return False return True def verify(self): # NOTE: must not use self.jf real_data = self.pread(self.current_range[0], self.current_range[1]) if real_data not in (self.prev_data, self.new_data): raise ConsistencyError def reopen(self): self.jf = None r = libjio.jfsck(self.fname) self.verify() libjio.jfsck_cleanup(self.fname) self.jf = libjio.open(self.fname, libjio.O_RDWR | libjio.O_CREAT, 0o600) return r def fiu_enable(self): fiu.enable_random('jio/*', probability = 0.02) def fiu_disable(self): fiu.disable('jio/*') def run(self): self.fiu_enable() nfailures = 0 sys.stdout.write(" ") for i in range(1, self.nops + 1): sys.stdout.write(".") if i % 10 == 0: sys.stdout.write(" ") if i % 50 == 0: sys.stdout.write(" %d\n" % i) sys.stdout.write(" ") sys.stdout.flush() if not self.randwrite(): nfailures += 1 self.fiu_disable() r = self.reopen() assert r['total'] <= 1 self.fiu_enable() self.verify() sys.stdout.write("\n") sys.stdout.flush() self.fiu_disable() return nfailures # # Main # def usage(): print(""" Use: jiostress [] If the number of operations is not provided, the default (1000) will be used.""") def main(): try: fname = sys.argv[1] fsize = int(sys.argv[2]) * 1024 * 1024 nops = 1000 if len(sys.argv) >= 4: nops = int(sys.argv[3]) except: usage() sys.exit(1) s = Stresser(fname, fsize, nops) print("Running stress test") nfailures = s.run() print("Stress test completed") print(" %d operations" % nops) print(" %d simulated failures" % nfailures) r = libjio.jfsck(fname) assert r['total'] == 0 libjio.jfsck_cleanup(fname) print("Final check completed") #os.unlink(fname) if __name__ == '__main__': main()