git » chasquid » commit 09d3c73

queue: Simplify sending loop

author Alberto Bertogli
2016-10-12 22:54:49 UTC
committer Alberto Bertogli
2016-10-21 21:13:39 UTC
parent c172894317bbf5a8e027fb678dc43adc4320432b

queue: Simplify sending loop

This patch simplifies the sending loop code:

 - Move the recipient sending function from a closure to a method.
 - Simplify the status update logic: we now update and write
   unconditionally (as we should have been doing).
 - Create a function for counting recipients in a given status.

It also adds a test for the removal of completed items from the queue,
which was not covered before and came up during development.

internal/queue/queue.go +55 -68
internal/queue/queue_test.go +11 -0

diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index dd043d7..d1dc9d4 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -305,60 +305,17 @@ func (item *Item) SendLoop(q *Queue) {
 		// Send to all recipients that are still pending.
 		var wg sync.WaitGroup
 		for _, rcpt := range item.Rcpt {
-			item.Lock()
-			status := rcpt.Status
-			item.Unlock()
-
-			if status != Recipient_PENDING {
+			if rcpt.Status != Recipient_PENDING {
 				continue
 			}
 
 			wg.Add(1)
-			go func(rcpt *Recipient, oldStatus Recipient_Status) {
-				defer wg.Done()
-				to := rcpt.Address
-				tr.Debugf("%s sending", to)
-
-				err, permanent := item.deliver(q, rcpt)
-
-				if err != nil {
-					if permanent {
-						tr.Errorf("%s permanent error: %v", to, err)
-						status = Recipient_FAILED
-					} else {
-						tr.Printf("%s temporary error: %v", to, err)
-					}
-				} else {
-					tr.Printf("%s sent", to)
-					status = Recipient_SENT
-				}
-
-				// Update + write on status change.
-				if oldStatus != status {
-					item.Lock()
-					rcpt.Status = status
-					if err != nil {
-						rcpt.LastFailureMessage = err.Error()
-					}
-					item.Unlock()
-
-					err = item.WriteTo(q.path)
-					if err != nil {
-						tr.Errorf("failed to write: %v", err)
-					}
-				}
-			}(rcpt, status)
+			go item.sendOneRcpt(&wg, tr, q, rcpt)
 		}
 		wg.Wait()
 
 		// If they're all done, no need to wait.
-		pending := 0
-		for _, rcpt := range item.Rcpt {
-			if rcpt.Status == Recipient_PENDING {
-				pending++
-			}
-		}
-		if pending == 0 {
+		if item.countRcpt(Recipient_PENDING) == 0 {
 			break
 		}
 
@@ -371,41 +328,41 @@ func (item *Item) SendLoop(q *Queue) {
 	}
 
 	// Completed to all recipients (some may not have succeeded).
-
-	failed := 0
-	for _, rcpt := range item.Rcpt {
-		if rcpt.Status == Recipient_FAILED {
-			failed++
-		}
-	}
-
-	if failed > 0 && item.From != "<>" {
+	if item.countRcpt(Recipient_FAILED) > 0 && item.From != "<>" {
 		sendDSN(tr, q, item)
 	}
 
 	tr.Printf("all done")
 	q.Remove(item.ID)
-
-	return
 }
 
-func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
-	tr.Debugf("sending DSN")
+// sendOneRcpt, and update it with the results.
+func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) {
+	defer wg.Done()
+	to := rcpt.Address
+	tr.Debugf("%s sending", to)
 
-	msg, err := deliveryStatusNotification(item)
+	err, permanent := item.deliver(q, rcpt)
+
+	item.Lock()
 	if err != nil {
-		tr.Errorf("failed to build DSN: %v", err)
-		return
+		rcpt.LastFailureMessage = err.Error()
+		if permanent {
+			tr.Errorf("%s permanent error: %v", to, err)
+			rcpt.Status = Recipient_FAILED
+		} else {
+			tr.Printf("%s temporary error: %v", to, err)
+		}
+	} else {
+		tr.Printf("%s sent", to)
+		rcpt.Status = Recipient_SENT
 	}
+	item.Unlock()
 
-	id, err := q.Put(item.Hostname, "<>", []string{item.From}, msg)
+	err = item.WriteTo(q.path)
 	if err != nil {
-		tr.Errorf("failed to queue DSN: %v", err)
-		return
+		tr.Errorf("failed to write: %v", err)
 	}
-
-	tr.Printf("queued DSN: %s", id)
-	dsnQueued.Add(1)
 }
 
 // deliver the item to the given recipient, using the couriers from the queue.
@@ -452,6 +409,36 @@ func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool)
 	}
 }
 
+// countRcpt counts how many recipients are in the given status.
+func (item *Item) countRcpt(status Recipient_Status) int {
+	c := 0
+	for _, rcpt := range item.Rcpt {
+		if rcpt.Status == status {
+			c++
+		}
+	}
+	return c
+}
+
+func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
+	tr.Debugf("sending DSN")
+
+	msg, err := deliveryStatusNotification(item)
+	if err != nil {
+		tr.Errorf("failed to build DSN: %v", err)
+		return
+	}
+
+	id, err := q.Put(item.Hostname, "<>", []string{item.From}, msg)
+	if err != nil {
+		tr.Errorf("failed to queue DSN: %v", err)
+		return
+	}
+
+	tr.Printf("queued DSN: %s", id)
+	dsnQueued.Add(1)
+}
+
 func nextDelay(last time.Duration) time.Duration {
 	switch {
 	case last < 1*time.Minute:
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index b77ebe1..f729f1c 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -80,6 +80,17 @@ func TestBasic(t *testing.T) {
 	localC.wg.Wait()
 	remoteC.wg.Wait()
 
+	// Make sure the delivered items leave the queue.
+	for d := time.Now().Add(2 * time.Second); time.Now().Before(d); {
+		if q.Len() == 0 {
+			break
+		}
+		time.Sleep(20 * time.Millisecond)
+	}
+	if q.Len() != 0 {
+		t.Fatalf("%d items not removed from the queue after delivery", q.Len())
+	}
+
 	cases := []struct {
 		courier    *TestCourier
 		expectedTo string