git » chasquid » commit 9d172a6

Make the queue aware of local and remote couriers

author Alberto Bertogli
2016-07-19 22:26:27 UTC
committer Alberto Bertogli
2016-07-22 00:44:45 UTC
parent 362ef6f6d06de5ee583f07cac8e130051d9e6de7

Make the queue aware of local and remote couriers

The routing courier is a nice idea in theory, but at least for now, we want
the queue to be aware of when a destination is local so we can implement
differentiated logic.

This may change in the future, though, but at the moment it's not clear that
the abstractions will be worth it.

So this patch removes it, and makes the queue do the routing. There is no
difference in how the two are handled yet, those will come in subsequent
patches.

chasquid.go +8 -17
internal/courier/courier.go +0 -19
internal/courier/courier_test.go +0 -44
internal/queue/queue.go +26 -7
internal/queue/queue_test.go +55 -25

diff --git a/chasquid.go b/chasquid.go
index 0a801e7..2779290 100644
--- a/chasquid.go
+++ b/chasquid.go
@@ -21,6 +21,7 @@ import (
 	"blitiri.com.ar/go/chasquid/internal/config"
 	"blitiri.com.ar/go/chasquid/internal/courier"
 	"blitiri.com.ar/go/chasquid/internal/queue"
+	"blitiri.com.ar/go/chasquid/internal/set"
 	"blitiri.com.ar/go/chasquid/internal/systemd"
 	"blitiri.com.ar/go/chasquid/internal/trace"
 	"blitiri.com.ar/go/chasquid/internal/userdb"
@@ -153,14 +154,11 @@ type Server struct {
 	tlsConfig *tls.Config
 
 	// Local domains.
-	localDomains map[string]bool
+	localDomains *set.String
 
 	// User databases (per domain).
 	userDBs map[string]*userdb.DB
 
-	// Local courier.
-	localCourier courier.Courier
-
 	// Time before we give up on a connection, even if it's sending data.
 	connTimeout time.Duration
 
@@ -175,7 +173,7 @@ func NewServer() *Server {
 	return &Server{
 		connTimeout:    20 * time.Minute,
 		commandTimeout: 1 * time.Minute,
-		localDomains:   map[string]bool{},
+		localDomains:   &set.String{},
 		userDBs:        map[string]*userdb.DB{},
 	}
 }
@@ -194,7 +192,7 @@ func (s *Server) AddListeners(ls []net.Listener) {
 }
 
 func (s *Server) AddDomain(d string) {
-	s.localDomains[d] = true
+	s.localDomains.Add(d)
 }
 
 func (s *Server) AddUserDB(domain string, db *userdb.DB) {
@@ -227,14 +225,10 @@ func (s *Server) ListenAndServe() {
 		glog.Fatalf("Error loading TLS config: %v", err)
 	}
 
-	// Create the queue, giving it a routing courier for delivery.
-	// We need to do this early, before accepting connections.
-	courier := &courier.Router{
-		Local:        &courier.Procmail{},
-		Remote:       &courier.SMTP{},
-		LocalDomains: s.localDomains,
-	}
-	s.queue = queue.New(courier)
+	// TODO: Create the queue when creating the server?
+	// Or even before, and just give it to the server?
+	s.queue = queue.New(
+		&courier.Procmail{}, &courier.SMTP{}, s.localDomains)
 
 	for _, addr := range s.addrs {
 		// Listen.
@@ -578,11 +572,8 @@ func (c *Conn) DATA(params string, tr *trace.Trace) (code int, msg string) {
 
 	tr.LazyPrintf("-> ... %d bytes of data", len(c.data))
 
-	// TODO: here is where we queue/send/process the message!
 	// There are no partial failures here: we put it in the queue, and then if
 	// individual deliveries fail, we report via email.
-	// TODO: this should queue, not send, the message.
-	// TODO: trace this.
 	msgID, err := c.queue.Put(c.mail_from, c.rcpt_to, c.data)
 	if err != nil {
 		tr.LazyPrintf("   error queueing: %v", err)
diff --git a/internal/courier/courier.go b/internal/courier/courier.go
index a6e2acd..e42a5b3 100644
--- a/internal/courier/courier.go
+++ b/internal/courier/courier.go
@@ -1,28 +1,9 @@
 // Package courier implements various couriers for delivering messages.
 package courier
 
-import "blitiri.com.ar/go/chasquid/internal/envelope"
-
 // Courier delivers mail to a single recipient.
 // It is implemented by different couriers, for both local and remote
 // recipients.
 type Courier interface {
 	Deliver(from string, to string, data []byte) error
 }
-
-// Router decides if the destination is local or remote, and delivers
-// accordingly.
-type Router struct {
-	Local        Courier
-	Remote       Courier
-	LocalDomains map[string]bool
-}
-
-func (r *Router) Deliver(from string, to string, data []byte) error {
-	d := envelope.DomainOf(to)
-	if r.LocalDomains[d] {
-		return r.Local.Deliver(from, to, data)
-	} else {
-		return r.Remote.Deliver(from, to, data)
-	}
-}
diff --git a/internal/courier/courier_test.go b/internal/courier/courier_test.go
deleted file mode 100644
index 069aa29..0000000
--- a/internal/courier/courier_test.go
+++ /dev/null
@@ -1,44 +0,0 @@
-package courier
-
-import "testing"
-
-// Counter courier, for testing purposes.
-type counter struct {
-	c int
-}
-
-func (c *counter) Deliver(from string, to string, data []byte) error {
-	c.c++
-	return nil
-}
-
-func TestRouter(t *testing.T) {
-	localC := &counter{}
-	remoteC := &counter{}
-	r := Router{
-		Local:  localC,
-		Remote: remoteC,
-		LocalDomains: map[string]bool{
-			"local1": true,
-			"local2": true,
-		},
-	}
-
-	for domain, c := range map[string]int{
-		"local1": 1,
-		"local2": 2,
-		"remote": 9,
-	} {
-		for i := 0; i < c; i++ {
-			r.Deliver("from", "a@"+domain, nil)
-		}
-	}
-
-	if localC.c != 3 {
-		t.Errorf("local mis-count: expected 3, got %d", localC.c)
-	}
-
-	if remoteC.c != 9 {
-		t.Errorf("remote mis-count: expected 9, got %d", remoteC.c)
-	}
-}
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 6dc29a0..e53bf30 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -10,6 +10,8 @@ import (
 	"time"
 
 	"blitiri.com.ar/go/chasquid/internal/courier"
+	"blitiri.com.ar/go/chasquid/internal/envelope"
+	"blitiri.com.ar/go/chasquid/internal/set"
 
 	"github.com/golang/glog"
 	"golang.org/x/net/trace"
@@ -60,16 +62,22 @@ type Queue struct {
 	// Mutex protecting q.
 	mu sync.RWMutex
 
-	// Courier to use to deliver mail.
-	courier courier.Courier
+	// Couriers to use to deliver mail.
+	localC  courier.Courier
+	remoteC courier.Courier
+
+	// Domains we consider local.
+	localDomains *set.String
 }
 
 // TODO: Store the queue on disk.
 // Load the queue and launch the sending loops on startup.
-func New(c courier.Courier) *Queue {
+func New(localC, remoteC courier.Courier, localDomains *set.String) *Queue {
 	return &Queue{
-		q:       map[string]*Item{},
-		courier: c,
+		q:            map[string]*Item{},
+		localC:       localC,
+		remoteC:      remoteC,
+		localDomains: localDomains,
 	}
 }
 
@@ -151,7 +159,6 @@ func (item *Item) SendLoop(q *Queue) {
 	defer tr.Finish()
 	tr.LazyPrintf("from: %s", item.From)
 
-	var err error
 	var delay time.Duration
 	for time.Since(item.Created) < giveUpAfter {
 		// Send to all recipients that are still pending.
@@ -167,12 +174,24 @@ func (item *Item) SendLoop(q *Queue) {
 				defer wg.Done()
 				tr.LazyPrintf("%s sending", to)
 
-				err = q.courier.Deliver(item.From, to, item.Data)
+				var err error
+				// TODO: If this is all the difference we end up having
+				// between the two couriers, consider going back to using a
+				// routing courier.
+				if envelope.DomainIn(to, q.localDomains) {
+					err = q.localC.Deliver(item.From, to, item.Data)
+				} else {
+					err = q.remoteC.Deliver(item.From, to, item.Data)
+				}
 				item.mu.Lock()
 				item.Results[to] = err
 				item.mu.Unlock()
 
 				if err != nil {
+					// TODO: Local deliveries should not be retried, if they
+					// fail due to the user not existing.
+					// -> we need to know the users.
+					// Or maybe we can just not care?
 					tr.LazyPrintf("error: %v", err)
 					glog.Infof("%s  -> %q fail: %v", item.ID, to, err)
 				} else {
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index 119e3e3..735f1d8 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -2,12 +2,15 @@ package queue
 
 import (
 	"bytes"
+	"sync"
 	"testing"
 	"time"
+
+	"blitiri.com.ar/go/chasquid/internal/set"
 )
 
-// Our own courier, for testing purposes.
-// Delivery is done by sending on a channel.
+// Test courier. Delivery is done by sending on a channel, so users have fine
+// grain control over the results.
 type ChanCourier struct {
 	requests chan deliverRequest
 	results  chan error
@@ -23,19 +26,42 @@ func (cc *ChanCourier) Deliver(from string, to string, data []byte) error {
 	cc.requests <- deliverRequest{from, to, data}
 	return <-cc.results
 }
-
-func newCourier() *ChanCourier {
+func newChanCourier() *ChanCourier {
 	return &ChanCourier{
 		requests: make(chan deliverRequest),
 		results:  make(chan error),
 	}
 }
 
+// Courier for test purposes. Never fails, and always remembers everything.
+type TestCourier struct {
+	wg       sync.WaitGroup
+	requests []*deliverRequest
+	reqFor   map[string]*deliverRequest
+}
+
+func (tc *TestCourier) Deliver(from string, to string, data []byte) error {
+	defer tc.wg.Done()
+	dr := &deliverRequest{from, to, data}
+	tc.requests = append(tc.requests, dr)
+	tc.reqFor[to] = dr
+	return nil
+}
+
+func newTestCourier() *TestCourier {
+	return &TestCourier{
+		reqFor: map[string]*deliverRequest{},
+	}
+}
+
 func TestBasic(t *testing.T) {
-	courier := newCourier()
-	q := New(courier)
+	localC := newTestCourier()
+	remoteC := newTestCourier()
+	q := New(localC, remoteC, set.NewString("loco"))
 
-	id, err := q.Put("from", []string{"to"}, []byte("data"))
+	localC.wg.Add(2)
+	remoteC.wg.Add(1)
+	id, err := q.Put("from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data"))
 	if err != nil {
 		t.Fatalf("Put: %v", err)
 	}
@@ -44,31 +70,35 @@ func TestBasic(t *testing.T) {
 		t.Errorf("short ID: %v", id)
 	}
 
-	q.mu.RLock()
-	item := q.q[id]
-	q.mu.RUnlock()
-
-	if item == nil {
-		t.Fatalf("item not in queue, racy test?")
-	}
+	localC.wg.Wait()
+	remoteC.wg.Wait()
 
-	if item.From != "from" || item.To[0] != "to" ||
-		!bytes.Equal(item.Data, []byte("data")) {
-		t.Errorf("different item: %#v", item)
+	cases := []struct {
+		courier    *TestCourier
+		expectedTo string
+	}{
+		{localC, "nodomain"},
+		{localC, "am@loco"},
+		{remoteC, "x@remote"},
 	}
+	for _, c := range cases {
+		req := c.courier.reqFor[c.expectedTo]
+		if req == nil {
+			t.Errorf("missing request for %q", c.expectedTo)
+			continue
+		}
 
-	// Test that we delivered the item.
-	req := <-courier.requests
-	courier.results <- nil
-
-	if req.from != "from" || req.to != "to" ||
-		!bytes.Equal(req.data, []byte("data")) {
-		t.Errorf("different courier request: %#v", req)
+		if req.from != "from" || req.to != c.expectedTo ||
+			!bytes.Equal(req.data, []byte("data")) {
+			t.Errorf("wrong request for %q: %v", c.expectedTo, req)
+		}
 	}
 }
 
 func TestFullQueue(t *testing.T) {
-	q := New(newCourier())
+	localC := newChanCourier()
+	remoteC := newChanCourier()
+	q := New(localC, remoteC, set.NewString())
 
 	// Force-insert maxQueueSize items in the queue.
 	oneID := ""