author | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-07-19 22:02:42 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-07-22 00:44:45 UTC |
parent | 21e69aa42f8263c0cb38dba31a22ba5818f25910 |
internal/queue/queue.go | +12 | -2 |
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 14ed2b1..6dc29a0 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -134,6 +134,14 @@ type Item struct { // Map of recipient -> last result of sending it. Results map[string]error + mu sync.Mutex +} + +func (item *Item) resultsFor(to string) (error, bool) { + item.mu.Lock() + defer item.mu.Unlock() + value, ok := item.Results[to] + return value, ok } func (item *Item) SendLoop(q *Queue) { @@ -149,7 +157,7 @@ func (item *Item) SendLoop(q *Queue) { // Send to all recipients that are still pending. var wg sync.WaitGroup for _, to := range item.To { - if err, ok := item.Results[to]; ok && err == nil { + if err, ok := item.resultsFor(to); ok && err == nil { // Successful send for this recipient, nothing to do. continue } @@ -160,7 +168,9 @@ func (item *Item) SendLoop(q *Queue) { tr.LazyPrintf("%s sending", to) err = q.courier.Deliver(item.From, to, item.Data) + item.mu.Lock() item.Results[to] = err + item.mu.Unlock() if err != nil { tr.LazyPrintf("error: %v", err) @@ -175,7 +185,7 @@ func (item *Item) SendLoop(q *Queue) { successful := 0 for _, to := range item.To { - if err, ok := item.Results[to]; ok && err == nil { + if err, ok := item.resultsFor(to); ok && err == nil { successful++ } }