git » linux-kernel » commit 04ee88c

WIP: dm-csum v2

author Alberto Bertogli
2009-10-31 20:07:46 UTC
committer Alberto Bertogli
2009-10-31 20:07:46 UTC
parent c16ee766aba058ffb65860766bbc7d2fcec12d0e

WIP: dm-csum v2

Signed-off-by: Alberto Bertogli <albertito@blitiri.com.ar>

drivers/md/dm-csum.c +404 -148

diff --git a/drivers/md/dm-csum.c b/drivers/md/dm-csum.c
index c77e1e6efe5..809cd1c7d71 100644
--- a/drivers/md/dm-csum.c
+++ b/drivers/md/dm-csum.c
@@ -23,8 +23,7 @@
  *
  * The CRC is, obviously, the CRC of the sector this structure refers to. The
  * flags are unused at the moment. The tag is not used by this module, but
- * made available to the upper layers through the integrity framework (TODO:
- * not implemented yet).
+ * made available to the upper layers through the integrity framework.
  *
  * The imd sector header contains a mark of the last update, so given two
  * brothers we can determine which one is younger.
@@ -47,7 +46,7 @@
  *    Let's assume M1 is newer than M2.
  *  - Update the M2 buffer to mark it's newer, and update the new data's CRC.
  *  - Submit the write to M2, and then the write to the data, using a barrier
- *    to make sure the metadata is updated _after_ the data.
+ *    to make sure the metadata is updated _before_ the data.
  *
  * Accordingly, the read operations are handled as follows:
  *
@@ -59,8 +58,9 @@
  *    the read involves multiple sectors, it is possible that some of the
  *    correct CRCs are in M1 and some in M2.
  *
- * This scheme assumes that writes to a given sector can fail without
- * affecting other sectors.
+ * This scheme assumes that single sector writes are atomic in the presence of
+ * a crash.
+ * XXX: is this a reasonable assumption?
  *
  * TODO: would it be better to have M1 and M2 apart, to improve the chances of
  * recovery in case of a failure?
@@ -71,7 +71,6 @@
  * A last_accessed counter is stored in the imd sector header and used to
  * find out if the given sector is newer than its brother. When writing out an
  * imd sector, we will increase its count by 2.
- * TODO: complete this comment.
  *
  *
  * Code overview
@@ -91,8 +90,8 @@
  *  - Generic and miscellaneous code, including the csum_c structured used to
  *    track a single csum device, and the functions used to manipulate sector
  *    numbers.
- *  - Code to handle the integrity metadata information, such as loading from
- *    a buffer and validation.
+ *  - bio-integrity.
+ *  - imd generation and verification.
  *  - Read handling (remember: only for nice bios).
  *  - Write handling (idem).
  *  - Work queue.
@@ -345,6 +344,7 @@ start:
 
 		if (unlikely(bytes_needed)) {
 			/* we need to go through the next bvec */
+			dprint("next bvec\n");
 			bvec_kunmap_irq(data, &flags);
 			continue;
 		}
@@ -366,17 +366,231 @@ start:
 	}
 }
 
+
+/*
+ * bio-integrity extensions
+ */
+
+#ifdef CONFIG_BLK_DEV_INTEGRITY
+
+static void imd_generate(struct blk_integrity_exchg *bix)
+{
+	unsigned int i;
+	void *buf = bix->data_buf;
+	struct imd_tuple *imd = bix->prot_buf;
+
+	/* dprint("imd_gen(): s:%llu ss:%u ds:%u\n",
+			(unsigned long long) bix->sector, bix->sector_size,
+			bix->data_size); */
+
+	for (i = 0; i < bix->data_size; i += bix->sector_size) {
+		imd->crc = crc_ccitt(0, buf, bix->sector_size);
+		imd->tag = 0;
+		imd->flags = 0;
+
+		buf += bix->sector_size;
+		imd++;
+	}
+}
+
+static int imd_verify(struct blk_integrity_exchg *bix)
+{
+	unsigned int i;
+	void *buf = bix->data_buf;
+	struct imd_tuple *imd = bix->prot_buf;
+	u16 crc;
+	sector_t sector = bix->sector;
+
+	/* dprint("imd_vfy(): s:%llu ss:%u ds:%u\n", (unsigned long long) sector,
+			bix->sector_size, bix->data_size); */
+
+	for (i = 0; i < bix->data_size; i += bix->sector_size) {
+		crc = crc_ccitt(0, buf, bix->sector_size);
+		if (crc != imd->crc) {
+			printk(KERN_ERR "%s: checksum error on sector %llu"
+					" - disk:%04x imd:%04x\n",
+					bix->disk_name,
+					(unsigned long long) sector, crc,
+					imd->crc);
+			dprint("verify: d:%p p:%p imd:%p\n", bix->data_buf,
+					bix->prot_buf, imd);
+			return -EIO;
+		}
+
+		buf += bix->sector_size;
+		imd++;
+		sector++;
+	}
+
+	return 0;
+}
+
+static void imd_get_tag(void *prot, void *tag_buf, unsigned int sectors)
+{
+	unsigned int i;
+	struct imd_tuple *imd = prot;
+	u16 *tag = tag_buf;
+
+	for (i = 0; i < sectors; i++) {
+		*tag = imd->tag;
+		tag++;
+		imd++;
+	}
+}
+
+static void imd_set_tag(void *prot, void *tag_buf, unsigned int sectors)
+{
+	unsigned int i;
+	struct imd_tuple *imd = prot;
+	u16 *tag = tag_buf;
+
+	for (i = 0; i < sectors; i++) {
+		imd->tag = *tag;
+		tag++;
+		imd++;
+	}
+}
+
+static struct blk_integrity integrity_profile = {
+	.name           = "LINUX-DMCSUM-V0-CCITT",
+	.generate_fn    = imd_generate,
+	.verify_fn      = imd_verify,
+	.get_tag_fn     = imd_get_tag,
+	.set_tag_fn     = imd_set_tag,
+	.tuple_size     = sizeof(struct imd_tuple),
+	.tag_size       = sizeof(u16),
+};
+
+static int bi_register(struct dm_target *ti)
+{
+	struct mapped_device *md;
+	struct gendisk *disk;
+
+	md = dm_table_get_md(ti->table);
+	disk = dm_disk(md);
+
+	return blk_integrity_register(disk, &integrity_profile);
+}
+
+static void bi_unregister(struct dm_target *ti)
+{
+	struct mapped_device *md;
+	struct gendisk *disk;
+
+	md = dm_table_get_md(ti->table);
+	disk = dm_disk(md);
+
+	blk_integrity_unregister(disk);
+}
+
+/* Copy the given buffer into the given bip */
+static void copy_to_bip(struct bio_integrity_payload *bip,
+		const unsigned char *buf, unsigned int size)
+{
+	unsigned int i;
+	unsigned int advance;
+	unsigned long flags;
+	struct bio_vec *bvec;
+
+	bip_for_each_vec(bvec, bip, i) {
+		unsigned char *data = bvec_kmap_irq(bvec, &flags);
+
+		advance = min(bvec->bv_len, size);
+
+		memcpy(data, buf, advance);
+
+		buf += advance;
+		size -= advance;
+
+		bvec_kunmap_irq(data, &flags);
+
+		if (size == 0)
+			break;
+	}
+}
+
+/* Set bio's integrity information taking it from imd_bio */
+static void set_bi_from_imd(struct bio *bio, struct bio *imd_bio)
+{
+	unsigned long flags;
+	struct imd_tuple *t;
+	struct bio_integrity_payload *bip = bio->bi_integrity;
+	unsigned char *imd_buf;
+
+	imd_buf = bvec_kmap_irq(bio_iovec(imd_bio), &flags);
+
+	t = (struct imd_tuple *) (imd_buf + sizeof(struct imd_sector_header));
+	t += bio->bi_sector % SECTORS_PER_IMD;
+
+	copy_to_bip(bip, (unsigned char *) t,
+			bio_sectors(bio) * sizeof(struct imd_tuple));
+
+	bvec_kunmap_irq(imd_buf, &flags);
+}
+
+/* Updates bio's integrity information at the given position, taking it from
+ * the given imd tuple */
+static void update_bi_info(struct bio *bio, unsigned int pos,
+		struct imd_tuple *tuple)
+{
+	unsigned long flags;
+	unsigned char *bip_buf;
+	struct imd_tuple *t;
+
+	BUG_ON(bio_integrity(bio) == 0);
+
+	bip_buf = bvec_kmap_irq(bip_vec(bio->bi_integrity), &flags);
+
+	BUG_ON(bip_buf == NULL);
+
+	t = (struct imd_tuple *) bip_buf;
+	t += pos;
+	t->crc = tuple->crc;
+	t->tag = tuple->tag;
+	t->flags = tuple->flags;
+
+	bvec_kunmap_irq(bip_buf, &flags);
+}
+#else /* BLK_DEV_INTEGRITY */
+
+static int bi_register(struct dm_target *ti)
+{
+	return 0;
+}
+
+static void bi_unregister(struct dm_target *ti)
+{
+	return;
+}
+
+static void set_bi_from_imd(struct bio *bio, struct bio *imd_bio)
+{
+	return;
+}
+
+static void update_bi_info(struct bio *bio, unsigned int pos,
+		struct imd_tuple *tuple)
+{
+	return;
+}
+#endif /* BLK_DEV_INTEGRITY */
+
+
+/*
+ * imd generation and verification
+ */
+
 /* Update the imd information for the given data bio. The function deals with
  * the imd bio directly, that holds one page with both imd sectors (M1 and
  * M2), as returned from prepare_imd_read(), and assumes it's been read from
  * disk (so it will only update what's needed).
  *
+ * Modifies imd_bio so it only writes the sector needed.
+ *
  * Returns:
  *  - 0 on success
  *  - -1 if there was a memory error
  *  - -2 if there was a consistency error
- *
- * TODO: make imd_bio write just the sector we need
  */
 static int update_imd_bio(const struct bio *data_bio, struct bio *imd_bio)
 {
@@ -421,19 +635,28 @@ static int update_imd_bio(const struct bio *data_bio, struct bio *imd_bio)
 
 	kfree(crc);
 
+	imd_bio->bi_size = 512;
+	bio_iovec(imd_bio)->bv_len = 512;
+	if (older == m2) {
+		imd_bio->bi_sector++;
+		bio_iovec(imd_bio)->bv_offset = 512;
+	}
+
 	return 0;
 }
 
 /* Verify that the CRCs from data_bio match the ones stored in imd_bio (which
- * contains both M1 and M2) */
+ * contains both M1 and M2), and update data_bio integrity information (if
+ * there is any) */
+/* TODO: choose a better name */
 static int verify_crc(struct bio *data_bio, struct bio *imd_bio)
 {
 	int i, r;
 	u16 *crc;
 	unsigned long flags;
 	unsigned char *imd_buf;
-	struct imd_sector_header *m1, *m2;
-	struct imd_tuple *m1t, *m2t;
+	struct imd_sector_header *m1, *m2, *older, *newer;
+	struct imd_tuple *nt, *ot;
 
 	crc = kmalloc(sizeof(u16) * bio_sectors(data_bio), GFP_NOIO);
 	if (crc == NULL)
@@ -446,42 +669,57 @@ static int verify_crc(struct bio *data_bio, struct bio *imd_bio)
 	m1 = (struct imd_sector_header *) imd_buf;
 	m2 = (struct imd_sector_header *) (imd_buf + 512);
 
-	m1t = (struct imd_tuple *) (m1 + 1);
-	m1t = m1t + data_bio->bi_sector % SECTORS_PER_IMD;
+	older = older_imd(m1, m2);
+	if (older == NULL) {
+		printk(KERN_WARNING "dm-csum: couldn't find older\n");
+		r = -ENOMEM;
+		goto exit;
+	}
 
-	m2t = (struct imd_tuple *) (m2 + 1);
-	m2t = m2t + data_bio->bi_sector % SECTORS_PER_IMD;
+	newer = m1;
+	if (older == m1)
+		newer = m2;
+
+	nt = (struct imd_tuple *) (newer + 1);
+	nt += data_bio->bi_sector % SECTORS_PER_IMD;
+	ot = (struct imd_tuple *) (older + 1);
+	ot += data_bio->bi_sector % SECTORS_PER_IMD;
 
 	r = 0;
 
+	BUG_ON(bio_sectors(data_bio) > SECTORS_PER_IMD);
+
 	for (i = 0; i < bio_sectors(data_bio); i++) {
-		WARN_ON(i >= SECTORS_PER_IMD);
-		if (m1t->crc != *(crc + i) && m2t->crc != *(crc + i)) {
+		if (nt->crc == *(crc + i)) {
+			update_bi_info(data_bio, i, nt);
+		} else if (ot->crc == *(crc + i)){
+			update_bi_info(data_bio, i, ot);
+
+			/* dprint("no match from new\n");
+			dprint(" new: %d %04x\n", newer->last_updated,
+					nt->crc);
+			dprint(" old: %d %04x\n", older->last_updated,
+					ot->crc);
+			dprint(" real: %04x\n", *(crc + i)); */
+		} else {
 			printk(KERN_WARNING
 				"dm-csum: CRC error at sector %lld\n",
 				(unsigned long long)
 					(data_bio->bi_sector + i));
-			/*
-			dprint("CRC: %llu m1:%x m2:%x r:%x\n",
+			dprint("CRC: %llu o:%x n:%x r:%x\n",
 					(unsigned long long)
 						(data_bio->bi_sector + i),
-					m1t->crc, m2t->crc, *(crc + i));
-			*/
+					ot->crc, nt->crc, *(crc + i));
 			r = -EIO;
 			break;
-		} else {
-			/*
-			dprint("OK : %llu m1:%x m2:%x r:%x\n",
-				(unsigned long long) (data_bio->bi_sector + i),
-				m1t->crc, m2t->crc, *(crc + i));
-			*/
 		}
-		m1t++;
-		m2t++;
+		nt++;
+		ot++;
 	}
 
 	/* TODO: validate the imd sector CRC */
 
+exit:
 	bvec_kunmap_irq(imd_buf, &flags);
 
 	kfree(crc);
@@ -490,6 +728,10 @@ static int verify_crc(struct bio *data_bio, struct bio *imd_bio)
 }
 
 
+/* Work queue where the read/write processing code is run.
+ * TODO: Unify with the submission workqueue once we have decent locking. */
+static struct workqueue_struct *io_wq;
+
 /*
  * READ handling (nice bios only)
  *
@@ -500,8 +742,6 @@ static int verify_crc(struct bio *data_bio, struct bio *imd_bio)
 
 /* Used to track pending reads */
 struct pending_read {
-	spinlock_t lock;
-
 	struct dm_target *ti;
 	struct csum_c *cc;
 	struct bio *orig_bio;
@@ -509,21 +749,20 @@ struct pending_read {
 	struct bio *data_bio;
 	struct bio *imd_bio;
 
-	bool data_error;
-	bool imd_error;
+	bool error;
 
 	/* number of operations pending */
-	unsigned int nr_pending;
-};
+	atomic_t nr_pending;
 
-#define pr_lock(pr) spin_lock(&((pr)->lock))
-#define pr_unlock(pr) spin_unlock(&((pr)->lock))
+	struct work_struct work;
+};
 
 static void read_nice_bio(struct dm_target *ti, struct bio *bio);
 static struct bio *prepare_data_read(struct bio *orig_bio,
 		struct block_device *bdev, sector_t sector, bio_end_io_t *cb,
 		void *private);
-static void read_complete(struct bio *bio, int error);
+static void queue_read_complete(struct bio *bio, int error);
+static void read_complete(struct work_struct *work);
 
 /* Read a nice bio */
 static void read_nice_bio(struct dm_target *ti, struct bio *bio)
@@ -535,26 +774,24 @@ static void read_nice_bio(struct dm_target *ti, struct bio *bio)
 	if (pr == NULL)
 		goto error;
 
-	spin_lock_init(&pr->lock);
 	pr->ti = ti;
 	pr->cc = cc;
 	pr->orig_bio = bio;
-	pr->data_error = false;
-	pr->imd_error = false;
+	pr->error = false;
 
 	pr->data_bio = prepare_data_read(pr->orig_bio, cc->data_dev->bdev,
 			cc->map_data_sector(ti, pr->orig_bio->bi_sector),
-			read_complete, pr);
+			queue_read_complete, pr);
 	if (pr->data_bio == NULL)
 		goto error;
 
 	pr->imd_bio = prepare_imd_read(cc->imd_dev->bdev,
 			cc->get_imd_sector(ti, pr->orig_bio->bi_sector),
-			read_complete, pr);
+			queue_read_complete, pr);
 	if (pr->imd_bio == NULL)
 		goto error;
 
-	pr->nr_pending = 2;
+	atomic_set(&pr->nr_pending, 2);
 
 	submit_bio(pr->data_bio->bi_rw, pr->data_bio);
 	submit_bio(pr->imd_bio->bi_rw, pr->imd_bio);
@@ -586,41 +823,41 @@ static struct bio *prepare_data_read(struct bio *orig_bio,
 	return bio;
 }
 
-/* Read completion callback */
-static void read_complete(struct bio *bio, int error)
+static void queue_read_complete(struct bio *bio, int error)
 {
-	int result = -EIO;
 	struct pending_read *pr = bio->bi_private;
 
-	pr_lock(pr);
-
-	pr->nr_pending--;
+	if (error)
+		pr->error = true;
 
-	if (pr->nr_pending > 0) {
-		pr_unlock(pr);
+	if (!atomic_dec_and_test(&pr->nr_pending))
 		return;
-	}
 
-	/* we know we have exclusive access to the pr, no need to lock
-	 * around it anymore */
-	pr_unlock(pr);
+	/* defer the completion so it's not run in interrupt context */
+	INIT_WORK(&(pr->work), read_complete);
+	queue_work(io_wq, &(pr->work));
+}
 
-	if (pr->data_error) {
-		printk(KERN_WARNING "dm-csum: error reading data\n");
-		goto exit;
-	} else if (pr->imd_error) {
-		printk(KERN_WARNING "dm-csum: error reading imd\n");
-		goto exit;
-	}
+static void read_complete(struct work_struct *work)
+{
+	int result = -EIO;
+	struct pending_read *pr;
 
-	/* TODO: add an option for those who do not want the bio to fail on
-	 * CRC errors */
-	result = verify_crc(pr->orig_bio, pr->imd_bio);
+	pr = container_of(work, struct pending_read, work);
 
-exit:
 	/* TODO: use decent locking */
 	up(&wq_lock);
 
+	/* it not only verifies the CRC, but also update orig_bio's integrity
+	 * information
+	 * TODO: add an option for those who do not want the bio to fail on
+	 * CRC errors */
+	/* XXX: should we update bip on failed bios? */
+	result = verify_crc(pr->orig_bio, pr->imd_bio);
+
+	if (pr->error)
+		result = -EIO;
+
 	/* free the page allocated in prepare_imd_read() */
 	__free_page(pr->imd_bio->bi_io_vec->bv_page);
 
@@ -634,9 +871,6 @@ exit:
 	kfree(pr);
 }
 
-#undef pr_lock
-#undef pr_unlock
-
 
 /*
  * WRITE handling (nice bios only)
@@ -644,8 +878,6 @@ exit:
 
 /* Used to track pending writes */
 struct pending_write {
-	spinlock_t lock;
-
 	struct dm_target *ti;
 	struct csum_c *cc;
 
@@ -654,20 +886,23 @@ struct pending_write {
 	struct bio *data_bio;
 
 	bool error;
-	unsigned int nr_pending;
-};
+	atomic_t nr_pending;
 
-#define pw_lock(pw) spin_lock(&((pw)->lock))
-#define pw_unlock(pw) spin_unlock(&((pw)->lock))
+	struct work_struct work1;
+	struct work_struct work2;
+};
 
 /* Writes begin with write_nice_bio(), that queues the imd bio read. When that
  * bio is done, write_stage1() gets called, which updates the imd data and
  * then queues both the imd write and the data write. When those are
  * completed, write_stage2() gets called, which finishes up and ends the
- * original bio. */
+ * original bio. To avoid running the completion code in interrupt context,
+ * the stage functions run through a workqueue. */
 static void write_nice_bio(struct dm_target *ti, struct bio *bio);
-static void write_stage1(struct bio *bio, int error);
-static void write_stage2(struct bio *bio, int error);
+static void queue_write_stage1(struct bio *bio, int error);
+static void write_stage1(struct work_struct *work);
+static void queue_write_stage2(struct bio *bio, int error);
+static void write_stage2(struct work_struct *work);
 
 /* Write a nice bio */
 static void write_nice_bio(struct dm_target *ti, struct bio *bio)
@@ -681,17 +916,16 @@ static void write_nice_bio(struct dm_target *ti, struct bio *bio)
 		return;
 	}
 
-	spin_lock_init(&pw->lock);
 	pw->ti = ti;
 	pw->cc = cc;
 	pw->orig_bio = bio;
 	pw->data_bio = NULL;
 	pw->error = false;
-	pw->nr_pending = 0;
+	atomic_set(&pw->nr_pending, 0);
 
 	pw->imd_bio = prepare_imd_read(cc->imd_dev->bdev,
 			cc->get_imd_sector(ti, pw->orig_bio->bi_sector),
-			write_stage1, pw);
+			queue_write_stage1, pw);
 	if (pw->imd_bio == NULL) {
 		kfree(pw);
 		bio_endio(bio, -ENOMEM);
@@ -701,17 +935,32 @@ static void write_nice_bio(struct dm_target *ti, struct bio *bio)
 	submit_bio(pw->imd_bio->bi_rw, pw->imd_bio);
 }
 
-/* Called when the imd bio has been read */
-static void write_stage1(struct bio *imd_bio, int error)
+static void queue_write_stage1(struct bio *imd_bio, int error)
+{
+	struct pending_write *pw = imd_bio->bi_private;
+
+	if (error)
+		pw->error = true;
+
+	INIT_WORK(&(pw->work1), write_stage1);
+	queue_work(io_wq, &(pw->work1));
+}
+
+static void write_stage1(struct work_struct *work)
 {
 	int r;
 	int err = -EIO;
-	struct pending_write *pw = imd_bio->bi_private;
-	struct csum_c *cc = pw->cc;
-	struct dm_target *ti = pw->ti;
 	struct bio *data_bio;
+	struct pending_write *pw;
+
+	pw = container_of(work, struct pending_write, work1);
+
+	//dprint("write stage 1 %llu\n", (unsigned long long) pw->orig_bio->bi_sector);
 
-	r = update_imd_bio(pw->orig_bio, imd_bio);
+	if (pw->error)
+		goto error;
+
+	r = update_imd_bio(pw->orig_bio, pw->imd_bio);
 	if (r == -1) {
 		err = -ENOMEM;
 		goto error;
@@ -723,8 +972,8 @@ static void write_stage1(struct bio *imd_bio, int error)
 	}
 
 	/* prepare bio for reuse */
-	imd_bio->bi_rw |= WRITE;
-	imd_bio->bi_end_io = write_stage2;
+	pw->imd_bio->bi_rw |= WRITE;
+	pw->imd_bio->bi_end_io = queue_write_stage2;
 
 	data_bio = bio_clone(pw->orig_bio, GFP_NOIO);
 	if (data_bio == NULL) {
@@ -733,56 +982,61 @@ static void write_stage1(struct bio *imd_bio, int error)
 	}
 
 	data_bio->bi_private = pw;
-	data_bio->bi_end_io = write_stage2;
-	data_bio->bi_bdev = cc->data_dev->bdev;
-	data_bio->bi_sector = cc->map_data_sector(ti, pw->orig_bio->bi_sector);
+	data_bio->bi_end_io = queue_write_stage2;
+	data_bio->bi_bdev = pw->cc->data_dev->bdev;
+	data_bio->bi_sector = pw->cc->map_data_sector(pw->ti,
+			pw->orig_bio->bi_sector);
 
 	/* data bio takes a barrier, so we know the imd write will have
 	 * completed before it hits the disk */
+	/* TODO: the underlying device might not support barriers
+	 * TODO: when data and imd are on separate devices, the barrier trick
+	 * is no longer useful */
 	data_bio->bi_rw |= (1 << BIO_RW_BARRIER);
 
 	pw->data_bio = data_bio;
 
-	/* TODO: when data and imd are on separate devices, the barrier trick
-	 * is no longer useful */
-
 	/* submit both bios at the end to simplify error handling; remember
 	 * the order is very important because of the barrier */
-	pw->nr_pending = 2;
-	submit_bio(imd_bio->bi_rw, imd_bio);
+	atomic_set(&pw->nr_pending, 2);
+	submit_bio(pw->imd_bio->bi_rw, pw->imd_bio);
 	submit_bio(data_bio->bi_rw, data_bio);
 	return;
 
 error:
 	bio_endio(pw->orig_bio, err);
-	__free_page(imd_bio->bi_io_vec->bv_page);
-	bio_put(imd_bio);
+	__free_page(pw->imd_bio->bi_io_vec->bv_page);
+	bio_put(pw->imd_bio);
 	kfree(pw);
 	return;
 }
 
-static void write_stage2(struct bio *bio, int error)
+static void queue_write_stage2(struct bio *bio, int error)
 {
 	struct pending_write *pw = bio->bi_private;
 
-	pw_lock(pw);
-	pw->nr_pending--;
 	if (error)
 		pw->error = true;
 
-	if (pw->nr_pending) {
-		pw_unlock(pw);
-		//dprint(" w s2 -\n");
+	if (!atomic_dec_and_test(&pw->nr_pending))
 		return;
-	}
 
-	/* if we got here, there's no need to lock pw anymore as we know we
-	 * have exclusive access */
-	pw_unlock(pw);
+	INIT_WORK(&(pw->work2), write_stage2);
+	queue_work(io_wq, &(pw->work2));
+}
+
+static void write_stage2(struct work_struct *work)
+{
+	struct pending_write *pw;
+
+	pw = container_of(work, struct pending_write, work2);
 
 	/* TODO: use decent locking */
 	up(&wq_lock);
 
+	if (bio_integrity(pw->orig_bio))
+		set_bi_from_imd(pw->orig_bio, pw->imd_bio);
+
 	/* free the imd_bio resources */
 	__free_page(pw->imd_bio->bi_io_vec->bv_page);
 	bio_put(pw->imd_bio);
@@ -796,21 +1050,20 @@ static void write_stage2(struct bio *bio, int error)
 	kfree(pw);
 }
 
-#undef pw_lock
-#undef pw_unlock
-
 
 /*
  * Work queue to process bios.
  *
- * It is created in dm_csum_init().
+ * It is created in dm_csum_init(). It handles both the bios queued by
+ * queue_nice_bio() and the final stages of the bio processing
+ * (read_final_stage() and write_final_stage()).
  *
  * TODO: handle more than one pending bio, and dispatch more than one as long
  * as they don't overlap. Maybe one worqueue per ctx? Or maybe delay the
  * creation of the workqueue until the first ctx?
  */
 
-static struct workqueue_struct *wq;
+static struct workqueue_struct *submit_wq;
 
 struct pending_work {
 	struct dm_target *ti;
@@ -863,7 +1116,7 @@ static int queue_nice_bio(struct dm_target *ti, struct bio *bio)
 
 	INIT_WORK(&(pending->w), process_nice_bio);
 
-	queue_work(wq, &(pending->w));
+	queue_work(submit_wq, &(pending->w));
 
 	return 0;
 }
@@ -884,7 +1137,8 @@ static int bio_is_evil(struct dm_target *ti, struct bio *bio)
 {
 	sector_t mapped_first, mapped_last;
 
-	/* TODO: add a comment on why we do this */
+	/* To detect when a bio is evil, we see if the mapped sector count is
+	 * larger than the bio sector count */
 	mapped_first = map_data_sector_same(ti, bio->bi_sector);
 	mapped_last = map_data_sector_same(ti,
 			bio->bi_sector + bio_sectors(bio) - 1);
@@ -895,24 +1149,19 @@ static int bio_is_evil(struct dm_target *ti, struct bio *bio)
 
 /* Used to track pending evil bios */
 struct pending_evil_bio {
-	spinlock_t lock;
-
 	struct csum_c *cc;
 
 	/* original evil bio */
 	struct bio *orig_bio;
 
 	/* number of bios pending */
-	unsigned int nr_pending;
+	atomic_t nr_pending;
 
 	/* were there any errors? */
 	bool error;
 
 };
 
-#define peb_lock(peb) spin_lock(&((peb)->lock))
-#define peb_unlock(peb) spin_unlock(&((peb)->lock))
-
 static int handle_evil_bio(struct dm_target *ti, struct bio *bio);
 static struct bio *prepare_nice_bio(struct pending_evil_bio *peb,
 		struct bio *bio, sector_t begin, sector_t size);
@@ -923,7 +1172,7 @@ static int handle_evil_bio(struct dm_target *ti, struct bio *bio)
 {
 	int i, r;
 	sector_t first, last, prelude, postlude;
-	unsigned int nmiddle, submitted_bios;
+	unsigned int nmiddle, submitted_bios, expected_bios;
 	struct pending_evil_bio *peb;
 	struct bio *new;
 
@@ -939,7 +1188,6 @@ static int handle_evil_bio(struct dm_target *ti, struct bio *bio)
 	if (peb == NULL)
 		return -ENOMEM;
 
-	spin_lock_init(&peb->lock);
 	peb->orig_bio = bio;
 	peb->error = false;
 	peb->cc = ti->private;
@@ -966,7 +1214,8 @@ static int handle_evil_bio(struct dm_target *ti, struct bio *bio)
 	/* How many SECTORS_PER_IMD are between the first and last cuts */
 	nmiddle = ( (last - postlude) - (first + prelude) ) / SECTORS_PER_IMD;
 
-	peb->nr_pending = 1 + nmiddle + 1;
+	expected_bios = 1 + nmiddle + 1;
+	atomic_set(&peb->nr_pending, expected_bios);
 
 	/*
 	dprint("  first:%lu last:%lu pre:%lu nm:%lu post:%lu pending:%lu\n",
@@ -1027,18 +1276,14 @@ prepare_error:
 	 * in-flight bios that have been submitted and will call
 	 * evil_bio_complete() when they're done; decrement the expected
 	 * number of bios, and check if we're already done */
-	peb_lock(peb);
-	peb->nr_pending -= peb->nr_pending - submitted_bios;
+	atomic_sub(expected_bios - submitted_bios, &peb->nr_pending);
 	peb->error = true;
 
-	if (peb->nr_pending == 0) {
-		peb_unlock(peb);
+	if (atomic_read(&peb->nr_pending) == 0) {
 		kfree(peb);
 		return -ENOMEM;
 	}
 
-	peb_unlock(peb);
-
 	return 0;
 }
 
@@ -1082,6 +1327,11 @@ static struct bio *prepare_nice_bio(struct pending_evil_bio *peb,
 	new->bi_private = peb;
 	new->bi_end_io = evil_bio_complete;
 
+	/* trim it so that the new bip_vec (which is shared with the original
+	 * bio) points to the right offset */
+	if (bio_integrity(bio))
+		bio_integrity_trim(new, begin - bio->bi_sector, size);
+
 	return new;
 }
 
@@ -1089,18 +1339,12 @@ static void evil_bio_complete(struct bio *bio, int error)
 {
 	struct pending_evil_bio *peb = bio->bi_private;
 
-	peb_lock(peb);
-
-	peb->nr_pending--;
 	if (error)
 		peb->error = true;
 
-	if (peb->nr_pending == 0) {
+	if (atomic_dec_and_test(&peb->nr_pending)) {
 		bio_endio(peb->orig_bio, peb->error ? -EIO : 0);
-		peb_unlock(peb);
 		kfree(peb);
-	} else {
-		peb_unlock(peb);
 	}
 
 	/* put the bio created with bio_clone() because we don't longer care
@@ -1108,9 +1352,6 @@ static void evil_bio_complete(struct bio *bio, int error)
 	bio_put(bio);
 }
 
-#undef peb_lock
-#undef peb_unlock
-
 
 /*
  * Device mapper
@@ -1187,6 +1428,11 @@ static int csum_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 
 	ti->private = cc;
 
+	if (bi_register(ti) != 0) {
+		ti->error = "Couldn't register with bio-integrity";
+		goto error;
+	}
+
 	return 0;
 
 error:
@@ -1207,6 +1453,8 @@ static void csum_dtr(struct dm_target *ti)
 {
 	struct csum_c *cc = ti->private;
 
+	bi_unregister(ti);
+
 	if (cc->data_dev == cc->imd_dev) {
 		dm_put_device(ti, cc->data_dev);
 	} else {
@@ -1252,14 +1500,21 @@ static int __init dm_csum_init(void)
 {
 	int dm_rv;
 
-	wq = create_workqueue("dm-csum");
-	if (wq == NULL)
+	submit_wq = create_workqueue("dm-csum-s");
+	if (submit_wq == NULL)
 		return -ENOMEM;
 
+	io_wq = create_workqueue("dm-csum-io");
+	if (io_wq == NULL) {
+		destroy_workqueue(submit_wq);
+		return -ENOMEM;
+	}
+
 	dm_rv = dm_register_target(&csum_target);
 	if (dm_rv < 0) {
 		DMERR("register failed: %d", dm_rv);
-		destroy_workqueue(wq);
+		destroy_workqueue(submit_wq);
+		destroy_workqueue(io_wq);
 		return dm_rv;
 	}
 
@@ -1269,7 +1524,8 @@ static int __init dm_csum_init(void)
 static void __exit dm_csum_exit(void)
 {
 	dm_unregister_target(&csum_target);
-	destroy_workqueue(wq);
+	destroy_workqueue(submit_wq);
+	destroy_workqueue(io_wq);
 }
 
 module_init(dm_csum_init)