author | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-10-17 22:08:02 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-10-21 21:20:49 UTC |
parent | 8e9e4eddc5d1b93695bb4f2b5af1a09e47af76e1 |
internal/queue/queue.go | +18 | -10 |
internal/queue/queue_test.go | +22 | -0 |
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 7338933..74635f7 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -11,6 +11,7 @@ import ( "encoding/base64" "expvar" "fmt" + mathrand "math/rand" "os" "os/exec" "path/filepath" @@ -305,7 +306,6 @@ func (item *Item) SendLoop(q *Queue) { defer tr.Finish() tr.Printf("from %s", item.From) - var delay time.Duration for time.Since(item.CreatedAt) < giveUpAfter { // Send to all recipients that are still pending. var wg sync.WaitGroup @@ -327,7 +327,7 @@ func (item *Item) SendLoop(q *Queue) { // TODO: Consider sending a non-final notification after 30m or so, // that some of the messages have been delayed. - delay = nextDelay(delay) + delay := nextDelay(item.CreatedAt) tr.Printf("waiting for %v", delay) maillog.QueueLoop(item.ID, delay) time.Sleep(delay) @@ -452,17 +452,25 @@ func sendDSN(tr *trace.Trace, q *Queue, item *Item) { dsnQueued.Add(1) } -func nextDelay(last time.Duration) time.Duration { +func nextDelay(createdAt time.Time) time.Duration { + var delay time.Duration + + since := time.Since(createdAt) switch { - case last < 1*time.Minute: - return 1 * time.Minute - case last < 5*time.Minute: - return 5 * time.Minute - case last < 10*time.Minute: - return 10 * time.Minute + case since < 1*time.Minute: + delay = 1 * time.Minute + case since < 5*time.Minute: + delay = 5 * time.Minute + case since < 10*time.Minute: + delay = 10 * time.Minute default: - return 20 * time.Minute + delay = 20 * time.Minute } + + // Perturb the delay, to avoid all queued emails to be retried at the + // exact same time after a restart. + delay += time.Duration(mathrand.Intn(60)) * time.Second + return delay } func timestampNow() *timestamp.Timestamp { diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index 0815a25..5a37cd5 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -263,3 +263,25 @@ func TestPipes(t *testing.T) { t.Errorf("pipe delivery failed: %v", err) } } + +func TestNextDelay(t *testing.T) { + cases := []struct{ since, min time.Duration }{ + {10 * time.Second, 1 * time.Minute}, + {3 * time.Minute, 5 * time.Minute}, + {7 * time.Minute, 10 * time.Minute}, + {15 * time.Minute, 20 * time.Minute}, + {30 * time.Minute, 20 * time.Minute}, + } + for _, c := range cases { + // Repeat each case a few times to exercise the perturbation a bit. + for i := 0; i < 10; i++ { + delay := nextDelay(time.Now().Add(-c.since)) + + max := c.min + 1*time.Minute + if delay < c.min || delay > max { + t.Errorf("since:%v expected [%v, %v], got %v", + c.since, c.min, max, delay) + } + } + } +}