git » chasquid » commit 831ef13

queue: Add a mutex to protect item's results

author Alberto Bertogli
2016-07-19 22:02:42 UTC
committer Alberto Bertogli
2016-07-22 00:44:45 UTC
parent 21e69aa42f8263c0cb38dba31a22ba5818f25910

queue: Add a mutex to protect item's results

The item results get accessed in various places concurrently, so this patch
adds a mutex to protect it.

internal/queue/queue.go +12 -2

diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 14ed2b1..6dc29a0 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -134,6 +134,14 @@ type Item struct {
 
 	// Map of recipient -> last result of sending it.
 	Results map[string]error
+	mu      sync.Mutex
+}
+
+func (item *Item) resultsFor(to string) (error, bool) {
+	item.mu.Lock()
+	defer item.mu.Unlock()
+	value, ok := item.Results[to]
+	return value, ok
 }
 
 func (item *Item) SendLoop(q *Queue) {
@@ -149,7 +157,7 @@ func (item *Item) SendLoop(q *Queue) {
 		// Send to all recipients that are still pending.
 		var wg sync.WaitGroup
 		for _, to := range item.To {
-			if err, ok := item.Results[to]; ok && err == nil {
+			if err, ok := item.resultsFor(to); ok && err == nil {
 				// Successful send for this recipient, nothing to do.
 				continue
 			}
@@ -160,7 +168,9 @@ func (item *Item) SendLoop(q *Queue) {
 				tr.LazyPrintf("%s sending", to)
 
 				err = q.courier.Deliver(item.From, to, item.Data)
+				item.mu.Lock()
 				item.Results[to] = err
+				item.mu.Unlock()
 
 				if err != nil {
 					tr.LazyPrintf("error: %v", err)
@@ -175,7 +185,7 @@ func (item *Item) SendLoop(q *Queue) {
 
 		successful := 0
 		for _, to := range item.To {
-			if err, ok := item.Results[to]; ok && err == nil {
+			if err, ok := item.resultsFor(to); ok && err == nil {
 				successful++
 			}
 		}