author | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-07-19 22:26:27 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-07-22 00:44:45 UTC |
parent | 362ef6f6d06de5ee583f07cac8e130051d9e6de7 |
chasquid.go | +8 | -17 |
internal/courier/courier.go | +0 | -19 |
internal/courier/courier_test.go | +0 | -44 |
internal/queue/queue.go | +26 | -7 |
internal/queue/queue_test.go | +55 | -25 |
diff --git a/chasquid.go b/chasquid.go index 0a801e7..2779290 100644 --- a/chasquid.go +++ b/chasquid.go @@ -21,6 +21,7 @@ import ( "blitiri.com.ar/go/chasquid/internal/config" "blitiri.com.ar/go/chasquid/internal/courier" "blitiri.com.ar/go/chasquid/internal/queue" + "blitiri.com.ar/go/chasquid/internal/set" "blitiri.com.ar/go/chasquid/internal/systemd" "blitiri.com.ar/go/chasquid/internal/trace" "blitiri.com.ar/go/chasquid/internal/userdb" @@ -153,14 +154,11 @@ type Server struct { tlsConfig *tls.Config // Local domains. - localDomains map[string]bool + localDomains *set.String // User databases (per domain). userDBs map[string]*userdb.DB - // Local courier. - localCourier courier.Courier - // Time before we give up on a connection, even if it's sending data. connTimeout time.Duration @@ -175,7 +173,7 @@ func NewServer() *Server { return &Server{ connTimeout: 20 * time.Minute, commandTimeout: 1 * time.Minute, - localDomains: map[string]bool{}, + localDomains: &set.String{}, userDBs: map[string]*userdb.DB{}, } } @@ -194,7 +192,7 @@ func (s *Server) AddListeners(ls []net.Listener) { } func (s *Server) AddDomain(d string) { - s.localDomains[d] = true + s.localDomains.Add(d) } func (s *Server) AddUserDB(domain string, db *userdb.DB) { @@ -227,14 +225,10 @@ func (s *Server) ListenAndServe() { glog.Fatalf("Error loading TLS config: %v", err) } - // Create the queue, giving it a routing courier for delivery. - // We need to do this early, before accepting connections. - courier := &courier.Router{ - Local: &courier.Procmail{}, - Remote: &courier.SMTP{}, - LocalDomains: s.localDomains, - } - s.queue = queue.New(courier) + // TODO: Create the queue when creating the server? + // Or even before, and just give it to the server? + s.queue = queue.New( + &courier.Procmail{}, &courier.SMTP{}, s.localDomains) for _, addr := range s.addrs { // Listen. @@ -578,11 +572,8 @@ func (c *Conn) DATA(params string, tr *trace.Trace) (code int, msg string) { tr.LazyPrintf("-> ... %d bytes of data", len(c.data)) - // TODO: here is where we queue/send/process the message! // There are no partial failures here: we put it in the queue, and then if // individual deliveries fail, we report via email. - // TODO: this should queue, not send, the message. - // TODO: trace this. msgID, err := c.queue.Put(c.mail_from, c.rcpt_to, c.data) if err != nil { tr.LazyPrintf(" error queueing: %v", err) diff --git a/internal/courier/courier.go b/internal/courier/courier.go index a6e2acd..e42a5b3 100644 --- a/internal/courier/courier.go +++ b/internal/courier/courier.go @@ -1,28 +1,9 @@ // Package courier implements various couriers for delivering messages. package courier -import "blitiri.com.ar/go/chasquid/internal/envelope" - // Courier delivers mail to a single recipient. // It is implemented by different couriers, for both local and remote // recipients. type Courier interface { Deliver(from string, to string, data []byte) error } - -// Router decides if the destination is local or remote, and delivers -// accordingly. -type Router struct { - Local Courier - Remote Courier - LocalDomains map[string]bool -} - -func (r *Router) Deliver(from string, to string, data []byte) error { - d := envelope.DomainOf(to) - if r.LocalDomains[d] { - return r.Local.Deliver(from, to, data) - } else { - return r.Remote.Deliver(from, to, data) - } -} diff --git a/internal/courier/courier_test.go b/internal/courier/courier_test.go deleted file mode 100644 index 069aa29..0000000 --- a/internal/courier/courier_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package courier - -import "testing" - -// Counter courier, for testing purposes. -type counter struct { - c int -} - -func (c *counter) Deliver(from string, to string, data []byte) error { - c.c++ - return nil -} - -func TestRouter(t *testing.T) { - localC := &counter{} - remoteC := &counter{} - r := Router{ - Local: localC, - Remote: remoteC, - LocalDomains: map[string]bool{ - "local1": true, - "local2": true, - }, - } - - for domain, c := range map[string]int{ - "local1": 1, - "local2": 2, - "remote": 9, - } { - for i := 0; i < c; i++ { - r.Deliver("from", "a@"+domain, nil) - } - } - - if localC.c != 3 { - t.Errorf("local mis-count: expected 3, got %d", localC.c) - } - - if remoteC.c != 9 { - t.Errorf("remote mis-count: expected 9, got %d", remoteC.c) - } -} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 6dc29a0..e53bf30 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -10,6 +10,8 @@ import ( "time" "blitiri.com.ar/go/chasquid/internal/courier" + "blitiri.com.ar/go/chasquid/internal/envelope" + "blitiri.com.ar/go/chasquid/internal/set" "github.com/golang/glog" "golang.org/x/net/trace" @@ -60,16 +62,22 @@ type Queue struct { // Mutex protecting q. mu sync.RWMutex - // Courier to use to deliver mail. - courier courier.Courier + // Couriers to use to deliver mail. + localC courier.Courier + remoteC courier.Courier + + // Domains we consider local. + localDomains *set.String } // TODO: Store the queue on disk. // Load the queue and launch the sending loops on startup. -func New(c courier.Courier) *Queue { +func New(localC, remoteC courier.Courier, localDomains *set.String) *Queue { return &Queue{ - q: map[string]*Item{}, - courier: c, + q: map[string]*Item{}, + localC: localC, + remoteC: remoteC, + localDomains: localDomains, } } @@ -151,7 +159,6 @@ func (item *Item) SendLoop(q *Queue) { defer tr.Finish() tr.LazyPrintf("from: %s", item.From) - var err error var delay time.Duration for time.Since(item.Created) < giveUpAfter { // Send to all recipients that are still pending. @@ -167,12 +174,24 @@ func (item *Item) SendLoop(q *Queue) { defer wg.Done() tr.LazyPrintf("%s sending", to) - err = q.courier.Deliver(item.From, to, item.Data) + var err error + // TODO: If this is all the difference we end up having + // between the two couriers, consider going back to using a + // routing courier. + if envelope.DomainIn(to, q.localDomains) { + err = q.localC.Deliver(item.From, to, item.Data) + } else { + err = q.remoteC.Deliver(item.From, to, item.Data) + } item.mu.Lock() item.Results[to] = err item.mu.Unlock() if err != nil { + // TODO: Local deliveries should not be retried, if they + // fail due to the user not existing. + // -> we need to know the users. + // Or maybe we can just not care? tr.LazyPrintf("error: %v", err) glog.Infof("%s -> %q fail: %v", item.ID, to, err) } else { diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index 119e3e3..735f1d8 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -2,12 +2,15 @@ package queue import ( "bytes" + "sync" "testing" "time" + + "blitiri.com.ar/go/chasquid/internal/set" ) -// Our own courier, for testing purposes. -// Delivery is done by sending on a channel. +// Test courier. Delivery is done by sending on a channel, so users have fine +// grain control over the results. type ChanCourier struct { requests chan deliverRequest results chan error @@ -23,19 +26,42 @@ func (cc *ChanCourier) Deliver(from string, to string, data []byte) error { cc.requests <- deliverRequest{from, to, data} return <-cc.results } - -func newCourier() *ChanCourier { +func newChanCourier() *ChanCourier { return &ChanCourier{ requests: make(chan deliverRequest), results: make(chan error), } } +// Courier for test purposes. Never fails, and always remembers everything. +type TestCourier struct { + wg sync.WaitGroup + requests []*deliverRequest + reqFor map[string]*deliverRequest +} + +func (tc *TestCourier) Deliver(from string, to string, data []byte) error { + defer tc.wg.Done() + dr := &deliverRequest{from, to, data} + tc.requests = append(tc.requests, dr) + tc.reqFor[to] = dr + return nil +} + +func newTestCourier() *TestCourier { + return &TestCourier{ + reqFor: map[string]*deliverRequest{}, + } +} + func TestBasic(t *testing.T) { - courier := newCourier() - q := New(courier) + localC := newTestCourier() + remoteC := newTestCourier() + q := New(localC, remoteC, set.NewString("loco")) - id, err := q.Put("from", []string{"to"}, []byte("data")) + localC.wg.Add(2) + remoteC.wg.Add(1) + id, err := q.Put("from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data")) if err != nil { t.Fatalf("Put: %v", err) } @@ -44,31 +70,35 @@ func TestBasic(t *testing.T) { t.Errorf("short ID: %v", id) } - q.mu.RLock() - item := q.q[id] - q.mu.RUnlock() - - if item == nil { - t.Fatalf("item not in queue, racy test?") - } + localC.wg.Wait() + remoteC.wg.Wait() - if item.From != "from" || item.To[0] != "to" || - !bytes.Equal(item.Data, []byte("data")) { - t.Errorf("different item: %#v", item) + cases := []struct { + courier *TestCourier + expectedTo string + }{ + {localC, "nodomain"}, + {localC, "am@loco"}, + {remoteC, "x@remote"}, } + for _, c := range cases { + req := c.courier.reqFor[c.expectedTo] + if req == nil { + t.Errorf("missing request for %q", c.expectedTo) + continue + } - // Test that we delivered the item. - req := <-courier.requests - courier.results <- nil - - if req.from != "from" || req.to != "to" || - !bytes.Equal(req.data, []byte("data")) { - t.Errorf("different courier request: %#v", req) + if req.from != "from" || req.to != c.expectedTo || + !bytes.Equal(req.data, []byte("data")) { + t.Errorf("wrong request for %q: %v", c.expectedTo, req) + } } } func TestFullQueue(t *testing.T) { - q := New(newCourier()) + localC := newChanCourier() + remoteC := newChanCourier() + q := New(localC, remoteC, set.NewString()) // Force-insert maxQueueSize items in the queue. oneID := ""