git » chasquid » commit c00648b

queue: Calculate next delay based on creation time

author Alberto Bertogli
2016-10-17 22:08:02 UTC
committer Alberto Bertogli
2016-10-21 21:20:49 UTC
parent 8e9e4eddc5d1b93695bb4f2b5af1a09e47af76e1

queue: Calculate next delay based on creation time

Calculating the next delay based on the previous delay causes daemon
restarts to start from scratch, as we don't persist it.

This can cause a few server restarts to generate many unnecessary sends.

This patch changes the next delay calculation to use the creation time
instead, and also adds a <=1m random perturbation to avoid all queued
emails to be retried at the exact same time after a restart.

internal/queue/queue.go +18 -10
internal/queue/queue_test.go +22 -0

diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 7338933..74635f7 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -11,6 +11,7 @@ import (
 	"encoding/base64"
 	"expvar"
 	"fmt"
+	mathrand "math/rand"
 	"os"
 	"os/exec"
 	"path/filepath"
@@ -305,7 +306,6 @@ func (item *Item) SendLoop(q *Queue) {
 	defer tr.Finish()
 	tr.Printf("from %s", item.From)
 
-	var delay time.Duration
 	for time.Since(item.CreatedAt) < giveUpAfter {
 		// Send to all recipients that are still pending.
 		var wg sync.WaitGroup
@@ -327,7 +327,7 @@ func (item *Item) SendLoop(q *Queue) {
 		// TODO: Consider sending a non-final notification after 30m or so,
 		// that some of the messages have been delayed.
 
-		delay = nextDelay(delay)
+		delay := nextDelay(item.CreatedAt)
 		tr.Printf("waiting for %v", delay)
 		maillog.QueueLoop(item.ID, delay)
 		time.Sleep(delay)
@@ -452,17 +452,25 @@ func sendDSN(tr *trace.Trace, q *Queue, item *Item) {
 	dsnQueued.Add(1)
 }
 
-func nextDelay(last time.Duration) time.Duration {
+func nextDelay(createdAt time.Time) time.Duration {
+	var delay time.Duration
+
+	since := time.Since(createdAt)
 	switch {
-	case last < 1*time.Minute:
-		return 1 * time.Minute
-	case last < 5*time.Minute:
-		return 5 * time.Minute
-	case last < 10*time.Minute:
-		return 10 * time.Minute
+	case since < 1*time.Minute:
+		delay = 1 * time.Minute
+	case since < 5*time.Minute:
+		delay = 5 * time.Minute
+	case since < 10*time.Minute:
+		delay = 10 * time.Minute
 	default:
-		return 20 * time.Minute
+		delay = 20 * time.Minute
 	}
+
+	// Perturb the delay, to avoid all queued emails to be retried at the
+	// exact same time after a restart.
+	delay += time.Duration(mathrand.Intn(60)) * time.Second
+	return delay
 }
 
 func timestampNow() *timestamp.Timestamp {
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index 0815a25..5a37cd5 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -263,3 +263,25 @@ func TestPipes(t *testing.T) {
 		t.Errorf("pipe delivery failed: %v", err)
 	}
 }
+
+func TestNextDelay(t *testing.T) {
+	cases := []struct{ since, min time.Duration }{
+		{10 * time.Second, 1 * time.Minute},
+		{3 * time.Minute, 5 * time.Minute},
+		{7 * time.Minute, 10 * time.Minute},
+		{15 * time.Minute, 20 * time.Minute},
+		{30 * time.Minute, 20 * time.Minute},
+	}
+	for _, c := range cases {
+		// Repeat each case a few times to exercise the perturbation a bit.
+		for i := 0; i < 10; i++ {
+			delay := nextDelay(time.Now().Add(-c.since))
+
+			max := c.min + 1*time.Minute
+			if delay < c.min || delay > max {
+				t.Errorf("since:%v  expected [%v, %v], got %v",
+					c.since, c.min, max, delay)
+			}
+		}
+	}
+}