git » libjio » commit 3da17d5

tests/stress: Support running multiple processes

author Alberto Bertogli
2009-09-25 00:12:11 UTC
committer Alberto Bertogli
2009-09-25 00:12:11 UTC
parent 0b8fe71c5d789d9484905a300afaafe1af25b4a3

tests/stress: Support running multiple processes

This patch adds support for multiple processes in the stress test. It also
adds support for two new options (besides the number of processes to use),
that are useful for running several instances of the stress test in
parallel.

Signed-off-by: Alberto Bertogli <albertito@blitiri.com.ar>

tests/stress/jiostress +226 -30

diff --git a/tests/stress/jiostress b/tests/stress/jiostress
index b186312..b9a2a4a 100755
--- a/tests/stress/jiostress
+++ b/tests/stress/jiostress
@@ -9,7 +9,10 @@ failures.
 
 import sys
 import os
+import time
+import select
 import random
+import fcntl
 import traceback
 from optparse import OptionParser
 import libjio
@@ -86,6 +89,124 @@ def pread(fd, start, end):
 	assert c == end - start
 	return r
 
+#
+# Output handler, used to get a nice output when using multiple processes
+#
+
+class OutputHandler:
+	def __init__(self, every):
+		# fds to read from
+		self.rs = []
+
+		# we will report every this number of seconds
+		self.every = every
+
+		# how many transactions has each child processed; we use the
+		# read end of the pipe to identify them
+		self.ntrans = {}
+
+		# like self.ntrans but counts only the failed ones
+		self.nfailures = {}
+
+		# fd to write to, only relevant in the child
+		self.w = None
+
+		# p = parent, c = child
+		self.end = 'p'
+
+		# last transaction number print
+		self.last_print = 0
+
+		# time of the last print
+		self.last_print_time = 0
+
+	def prefork(self):
+		r, w = os.pipe()
+		self.rs.append(r)
+		self.ntrans[r] = 0
+		self.nfailures[r] = 0
+		self.w = w
+
+	def child(self):
+		self.end = 'c'
+		os.close(self.rs[-1])
+		self.rs = []
+
+	def parent(self):
+		os.close(self.w)
+		self.w = None
+
+	SUCCESS = bytes('1', encoding = 'ascii')
+	FAILURE = bytes('0', encoding = 'ascii')
+
+	def feed(self, success = True):
+		if success:
+			os.write(self.w, OutputHandler.SUCCESS)
+		else:
+			os.write(self.w, OutputHandler.FAILURE)
+
+	def output_loop(self):
+		while self.rs:
+			rr, rw, rx = select.select(self.rs, [], [], 1)
+			for r in rr:
+				d = os.read(r, 1)
+				if not d:
+					self.rs.remove(r)
+				else:
+					self.ntrans[r] += 1
+					if d == OutputHandler.FAILURE:
+						self.nfailures[r] += 1
+
+			self.cond_print()
+		self.print()
+		return sum(self.ntrans.values()), sum(self.nfailures.values())
+
+	def cond_print(self):
+		if time.time() - self.last_print_time >= self.every:
+			self.print()
+
+	def print(self):
+		self.last_print_time = time.time()
+		for r in sorted(self.ntrans):
+			print("%4d" % self.ntrans[r], end = ' ')
+		print()
+
+
+#
+# Lock manager, used to lock ranges between multiple processes
+#
+# We can't lock the real file because that would ruin libjio's locking, so we
+# create a new file, remove it, and use fcntl locking. Not very elegant but it
+# does the trick.
+#
+
+class VoidLockManager:
+	def __init__(self):
+		pass
+
+	def lock(self, start, end):
+		pass
+
+	def unlock(self, start, end):
+		pass
+
+class LockManager:
+	def __init__(self):
+		fname = "/tmp/js-lock-tmp." + str(os.getpid())
+		self.fd = open(fname, 'w+')
+		os.unlink(fname)
+
+	def lock(self, start, end):
+		#print(os.getpid(), '\tlock:', start, end)
+		#sys.stdout.flush()
+		fcntl.lockf(self.fd, fcntl.LOCK_EX, end - start, start)
+
+	def unlock(self, start, end):
+		#print(os.getpid(), '\tunlock:', start, end)
+		#sys.stdout.flush()
+		fcntl.lockf(self.fd, fcntl.LOCK_UN, end - start, start)
+
+
 #
 # A range of bytes inside a file, used inside the transactions
 #
@@ -94,7 +215,7 @@ def pread(fd, start, end):
 #
 
 class Range:
-	def __init__(self, fsize, maxlen):
+	def __init__(self, fsize, maxlen, lockmgr):
 		# public
 		self.start, self.end = randfrange(fsize, maxlen)
 		self.new_data = None
@@ -104,12 +225,21 @@ class Range:
 		self.prev_data = None
 		self.new_data_ctx = None
 		self.read_buf = None
+		self.lockmgr = lockmgr
+		self.locked = False
 
 		# read an extended range so we can check we
 		# only wrote what we were supposed to
 		self.ext_start = max(0, self.start - 32)
 		self.ext_end = min(fsize, self.end + 32)
 
+	def __lt__(self, other):
+		return self.ext_start < other.ext_start
+
+	def __del__(self):
+		if self.locked:
+			self.lockmgr.unlock(self.ext_start, self.ext_end)
+
 	def overlaps(self, other):
 		if (other.ext_start <= self.ext_start <= other.ext_end) or \
 		   (other.ext_start <= self.ext_end <= other.ext_end) or \
@@ -121,6 +251,8 @@ class Range:
 	def prepare_r(self):
 		self.type = 'r'
 		self.read_buf = bytearray(self.end - self.start)
+		self.lockmgr.lock(self.ext_start, self.ext_end)
+		self.locked = True
 
 	def verify_r(self, fd):
 		real_data = pread(fd, self.start, self.end)
@@ -131,6 +263,9 @@ class Range:
 
 	def prepare_w(self, fd):
 		self.type = 'w'
+		self.lockmgr.lock(self.ext_start, self.ext_end)
+		self.locked = True
+
 		self.prev_data = pread(fd, self.ext_start, self.ext_end)
 
 		self.new_data = getbytes(self.end - self.start)
@@ -173,7 +308,7 @@ class Range:
 
 class T_base:
 	"Interface for the transaction types"
-	def __init__(self, f, jf, fsize):
+	def __init__(self, f, jf, fsize, lockmgr, do_verify):
 		pass
 
 	def prepare(self):
@@ -186,13 +321,14 @@ class T_base:
 		pass
 
 class T_jwrite (T_base):
-	def __init__(self, f, jf, fsize):
+	def __init__(self, f, jf, fsize, lockmgr, do_verify):
 		self.f = f
 		self.jf = jf
 		self.fsize = fsize
+		self.do_verify = do_verify
 
 		self.maxoplen = min(int(fsize / 256), 16 * 1024 * 1024)
-		self.range = Range(self.fsize, self.maxoplen)
+		self.range = Range(self.fsize, self.maxoplen, lockmgr)
 
 	def prepare(self):
 		self.range.prepare_w(self.f)
@@ -201,13 +337,16 @@ class T_jwrite (T_base):
 		self.jf.pwrite(self.range.new_data, self.range.start)
 
 	def verify(self, write_only = False):
+		if not self.do_verify:
+			return
 		self.range.verify(self.f)
 
 class T_writeonly (T_base):
-	def __init__(self, f, jf, fsize):
+	def __init__(self, f, jf, fsize, lockmgr, do_verify):
 		self.f = f
 		self.jf = jf
 		self.fsize = fsize
+		self.do_verify = do_verify
 
 		# favour many small ops
 		self.maxoplen = 1 * 1024 * 1024
@@ -217,7 +356,7 @@ class T_writeonly (T_base):
 
 		c = 0
 		while len(self.ranges) < self.nops and c < self.nops * 1.25:
-			candidate = Range(self.fsize, self.maxoplen)
+			candidate = Range(self.fsize, self.maxoplen, lockmgr)
 			safe = True
 			for r in self.ranges:
 				if candidate.overlaps(r):
@@ -227,6 +366,10 @@ class T_writeonly (T_base):
 				self.ranges.append(candidate)
 			c += 1
 
+		# sort the transactions so there's no risk of internal
+		# deadlocks via the lock manager
+		self.ranges.sort()
+
 	def prepare(self):
 		for r in self.ranges:
 			r.prepare_w(self.f)
@@ -238,6 +381,9 @@ class T_writeonly (T_base):
 		t.commit()
 
 	def verify(self, write_only = False):
+		if not self.do_verify:
+			return
+
 		try:
 			for r in self.ranges:
 				r.verify(self.f)
@@ -250,8 +396,8 @@ class T_writeonly (T_base):
 			raise
 
 class T_readwrite (T_writeonly):
-	def __init__(self, f, jf, fsize):
-		T_writeonly.__init__(self, f, jf, fsize)
+	def __init__(self, f, jf, fsize, lockmgr, do_verify):
+		T_writeonly.__init__(self, f, jf, fsize, lockmgr, do_verify)
 		self.read_ranges = []
 
 	def prepare(self):
@@ -271,6 +417,9 @@ class T_readwrite (T_writeonly):
 		t.commit()
 
 	def verify(self, write_only = False):
+		if not self.do_verify:
+			return
+
 		try:
 			for r in self.ranges:
 				if write_only and r.type == 'r':
@@ -292,12 +441,16 @@ t_list = [T_jwrite, T_writeonly, T_readwrite]
 #
 
 class Stresser:
-	def __init__(self, fname, fsize, nops, use_fi, use_as):
+	def __init__(self, fname, fsize, nops, use_fi, use_as, output,
+			lockmgr, do_verify):
 		self.fname = fname
 		self.fsize = fsize
 		self.nops = nops
 		self.use_fi = use_fi
 		self.use_as = use_as
+		self.output = output
+		self.lockmgr = lockmgr
+		self.do_verify = do_verify
 
 		jflags = 0
 		if use_as:
@@ -395,31 +548,25 @@ class Stresser:
 
 	def run(self):
 		nfailures = 0
-		sys.stdout.write("  ")
 
 		for i in range(1, self.nops + 1):
-			sys.stdout.write(".")
-			if i % 10 == 0:
-				sys.stdout.write(" ")
-			if i % 50 == 0:
-				sys.stdout.write(" %d\n" % i)
-				sys.stdout.write("  ")
-			sys.stdout.flush()
-
 			trans = random.choice(t_list)(self.f, self.jf,
-					self.fsize)
+					self.fsize, self.lockmgr,
+					self.do_verify)
 
 			if self.use_fi:
 				r = self.apply_fork(trans)
 			else:
 				r = self.apply(trans)
-			if not r:
+
+			if r:
+				self.output.feed(success = True)
+			else:
+				self.output.feed(success = False)
 				nfailures += 1
 				r = self.reopen(trans)
 				trans.verify(write_only = True)
 
-		sys.stdout.write("\n")
-		sys.stdout.flush()
 		return nfailures
 
 
@@ -427,6 +574,41 @@ class Stresser:
 # Main
 #
 
+def run_stressers(nproc, fname, fsize, nops, use_fi, use_as, output, lockmgr,
+		do_verify):
+	pids = []
+	print("Launching stress test")
+	for i in range(nproc):
+		# Calculate how many operations will this child perform. The
+		# last one will work a little more so we get exactly nops.
+		# Note we prefer to work extra in the end rather than having
+		# the last process with 0 child_nops, that's why we use int()
+		# instead of round() or ceil().
+		child_nops = int(nops / nproc)
+		if i == nproc - 1:
+			child_nops = nops - int(nops / nproc) * i
+
+		output.prefork()
+		sys.stdout.flush()
+		pid = os.fork()
+		if pid == 0:
+			# child
+			output.child()
+			s = Stresser(fname, fsize, child_nops, use_fi, use_as,
+					output, lockmgr, do_verify)
+			s.run()
+			sys.exit(0)
+		else:
+			output.parent()
+			pids.append(pid)
+
+	print("Launched stress tests")
+	totalops, nfailures = output.output_loop()
+	print("Stress test completed, waiting for children")
+	for pid in pids:
+		os.waitpid(pid, 0)
+	print("  %d operations" % totalops)
+	print("  %d simulated failures" % nfailures)
 
 def main():
 	usage = "Use: %prog [options] <file name> <file size in Mb>"
@@ -440,6 +622,16 @@ def main():
 	parser.add_option("-a", "--as", dest = "use_as",
 		action = "store_true", default = False,
 		help = "use J_LINGER + autosync (conflicts with --fi)")
+	parser.add_option("-p", "--nproc", dest = "nproc", type = "int",
+		default = 1,
+		help = "number of processes (defaults to %default)")
+	parser.add_option("", "--no-internal-lock",
+		dest = "use_internal_locks", action = "store_false",
+		default = True,
+		help = "do not lock internally, disables verification")
+	parser.add_option("", "--no-verify", dest = "do_verify",
+		action = "store_false", default = True,
+		help = "do not perform verifications")
 
 	options, args = parser.parse_args()
 
@@ -458,14 +650,18 @@ def main():
 		print("Error: --fi and --as cannot be used together")
 		sys.exit(1)
 
-	s = Stresser(fname, fsize, options.nops, options.use_fi,
-			options.use_as)
-	print("Running stress test")
-	nfailures = s.run()
-	del s
-	print("Stress test completed")
-	print("  %d operations" % options.nops)
-	print("  %d simulated failures" % nfailures)
+	if not options.use_internal_locks:
+		options.do_verify = False
+
+	output = OutputHandler(every = 2)
+	if options.use_internal_locks:
+		lockmgr = LockManager()
+	else:
+		lockmgr = VoidLockManager()
+
+	run_stressers(options.nproc, fname, fsize, options.nops,
+			options.use_fi, options.use_as, output, lockmgr,
+			options.do_verify)
 
 	r = jfsck(fname)
 	print("Final check completed")