author | Alberto Bertogli
<albertito@blitiri.com.ar> 2015-11-13 01:35:13 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2015-11-13 03:41:06 UTC |
parent | 77d547288f4dd361c84a6fb062f24d3df3ae370f |
internal/queue/queue.go | +22 | -12 |
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 7bbd877..94ea847 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -146,26 +146,36 @@ func (item *Item) SendLoop(q *Queue) { var err error for time.Since(item.Created) < giveUpAfter { // Send to all recipients that are still pending. - successful := 0 + var wg sync.WaitGroup for _, to := range item.To { if err, ok := item.Results[to]; ok && err == nil { // Successful send for this recipient, nothing to do. - successful++ continue } - tr.LazyPrintf("%s sending", to) + wg.Add(1) + go func(to string) { + defer wg.Done() + tr.LazyPrintf("%s sending", to) + + err = q.courier.Deliver(item.From, to, item.Data) + item.Results[to] = err + + if err != nil { + tr.LazyPrintf("error: %v", err) + glog.Infof("%s -> %q fail: %v", item.ID, to, err) + } else { + tr.LazyPrintf("%s successful", to) + glog.Infof("%s -> %q sent", item.ID, to) + } + }(to) + } + wg.Wait() - // TODO: deliver, serially or in parallel with a waitgroup. - err = q.courier.Deliver(item.From, to, item.Data) - item.Results[to] = err - if err != nil { - tr.LazyPrintf("error: %v", err) - glog.Infof("%s -> %q fail: %v", item.ID, to, err) - } else { + successful := 0 + for _, to := range item.To { + if err, ok := item.Results[to]; ok && err == nil { successful++ - tr.LazyPrintf("%s successful", to) - glog.Infof("%s -> %q sent", item.ID, to) } }