author | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-10-12 22:54:49 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-10-21 21:13:39 UTC |
parent | c172894317bbf5a8e027fb678dc43adc4320432b |
internal/queue/queue.go | +55 | -68 |
internal/queue/queue_test.go | +11 | -0 |
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index dd043d7..d1dc9d4 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -305,60 +305,17 @@ func (item *Item) SendLoop(q *Queue) { // Send to all recipients that are still pending. var wg sync.WaitGroup for _, rcpt := range item.Rcpt { - item.Lock() - status := rcpt.Status - item.Unlock() - - if status != Recipient_PENDING { + if rcpt.Status != Recipient_PENDING { continue } wg.Add(1) - go func(rcpt *Recipient, oldStatus Recipient_Status) { - defer wg.Done() - to := rcpt.Address - tr.Debugf("%s sending", to) - - err, permanent := item.deliver(q, rcpt) - - if err != nil { - if permanent { - tr.Errorf("%s permanent error: %v", to, err) - status = Recipient_FAILED - } else { - tr.Printf("%s temporary error: %v", to, err) - } - } else { - tr.Printf("%s sent", to) - status = Recipient_SENT - } - - // Update + write on status change. - if oldStatus != status { - item.Lock() - rcpt.Status = status - if err != nil { - rcpt.LastFailureMessage = err.Error() - } - item.Unlock() - - err = item.WriteTo(q.path) - if err != nil { - tr.Errorf("failed to write: %v", err) - } - } - }(rcpt, status) + go item.sendOneRcpt(&wg, tr, q, rcpt) } wg.Wait() // If they're all done, no need to wait. - pending := 0 - for _, rcpt := range item.Rcpt { - if rcpt.Status == Recipient_PENDING { - pending++ - } - } - if pending == 0 { + if item.countRcpt(Recipient_PENDING) == 0 { break } @@ -371,41 +328,41 @@ func (item *Item) SendLoop(q *Queue) { } // Completed to all recipients (some may not have succeeded). - - failed := 0 - for _, rcpt := range item.Rcpt { - if rcpt.Status == Recipient_FAILED { - failed++ - } - } - - if failed > 0 && item.From != "<>" { + if item.countRcpt(Recipient_FAILED) > 0 && item.From != "<>" { sendDSN(tr, q, item) } tr.Printf("all done") q.Remove(item.ID) - - return } -func sendDSN(tr *trace.Trace, q *Queue, item *Item) { - tr.Debugf("sending DSN") +// sendOneRcpt, and update it with the results. +func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) { + defer wg.Done() + to := rcpt.Address + tr.Debugf("%s sending", to) - msg, err := deliveryStatusNotification(item) + err, permanent := item.deliver(q, rcpt) + + item.Lock() if err != nil { - tr.Errorf("failed to build DSN: %v", err) - return + rcpt.LastFailureMessage = err.Error() + if permanent { + tr.Errorf("%s permanent error: %v", to, err) + rcpt.Status = Recipient_FAILED + } else { + tr.Printf("%s temporary error: %v", to, err) + } + } else { + tr.Printf("%s sent", to) + rcpt.Status = Recipient_SENT } + item.Unlock() - id, err := q.Put(item.Hostname, "<>", []string{item.From}, msg) + err = item.WriteTo(q.path) if err != nil { - tr.Errorf("failed to queue DSN: %v", err) - return + tr.Errorf("failed to write: %v", err) } - - tr.Printf("queued DSN: %s", id) - dsnQueued.Add(1) } // deliver the item to the given recipient, using the couriers from the queue. @@ -452,6 +409,36 @@ func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool) } } +// countRcpt counts how many recipients are in the given status. +func (item *Item) countRcpt(status Recipient_Status) int { + c := 0 + for _, rcpt := range item.Rcpt { + if rcpt.Status == status { + c++ + } + } + return c +} + +func sendDSN(tr *trace.Trace, q *Queue, item *Item) { + tr.Debugf("sending DSN") + + msg, err := deliveryStatusNotification(item) + if err != nil { + tr.Errorf("failed to build DSN: %v", err) + return + } + + id, err := q.Put(item.Hostname, "<>", []string{item.From}, msg) + if err != nil { + tr.Errorf("failed to queue DSN: %v", err) + return + } + + tr.Printf("queued DSN: %s", id) + dsnQueued.Add(1) +} + func nextDelay(last time.Duration) time.Duration { switch { case last < 1*time.Minute: diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index b77ebe1..f729f1c 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -80,6 +80,17 @@ func TestBasic(t *testing.T) { localC.wg.Wait() remoteC.wg.Wait() + // Make sure the delivered items leave the queue. + for d := time.Now().Add(2 * time.Second); time.Now().Before(d); { + if q.Len() == 0 { + break + } + time.Sleep(20 * time.Millisecond) + } + if q.Len() != 0 { + t.Fatalf("%d items not removed from the queue after delivery", q.Len()) + } + cases := []struct { courier *TestCourier expectedTo string