author | Alberto Bertogli
<albertito@blitiri.com.ar> 2015-10-31 16:48:01 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2015-11-06 10:27:05 UTC |
parent | 701f359634ee18563a80e33f8d99275d1ec140b2 |
chasquid.go | +23 | -1 |
chasquid_test.go | +6 | -0 |
internal/queue/queue.go | +174 | -0 |
internal/queue/queue_test.go | +70 | -0 |
diff --git a/chasquid.go b/chasquid.go index 222a84c..98dade2 100644 --- a/chasquid.go +++ b/chasquid.go @@ -17,6 +17,7 @@ import ( "time" "blitiri.com.ar/go/chasquid/internal/config" + "blitiri.com.ar/go/chasquid/internal/queue" "blitiri.com.ar/go/chasquid/internal/systemd" _ "net/http/pprof" @@ -119,12 +120,16 @@ type Server struct { // Time we wait for command round-trips (excluding DATA). commandTimeout time.Duration + + // Queue where we put incoming mail. + queue *queue.Queue } func NewServer() *Server { return &Server{ connTimeout: 20 * time.Minute, commandTimeout: 1 * time.Minute, + queue: queue.New(), } } @@ -211,6 +216,7 @@ func (s *Server) serve(l net.Listener) { tlsConfig: s.tlsConfig, deadline: time.Now().Add(s.connTimeout), commandTimeout: s.commandTimeout, + queue: s.queue, } go sc.Handle() } @@ -244,6 +250,9 @@ type Conn struct { // When we should close this connection, no matter what. deadline time.Time + // Queue where we put incoming mails. + queue *queue.Queue + // Time we wait for network operations. commandTimeout time.Duration } @@ -321,6 +330,7 @@ loop: if err != nil { tr.LazyPrintf("exiting with error: %v", err) + tr.SetError() } } @@ -396,8 +406,8 @@ func (c *Conn) MAIL(params string) (code int, msg string) { // Note some servers check (and fail) if we had a previous MAIL command, // but that's not according to the RFC. We reset the envelope instead. - c.resetEnvelope() + c.mail_from = e.Address return 250, "You feel like you are being watched" } @@ -463,6 +473,18 @@ func (c *Conn) DATA(params string, tr trace.Trace) (code int, msg string) { tr.LazyPrintf("-> ... %d bytes of data", len(c.data)) // TODO: here is where we queue/send/process the message! + // There are no partial failures here: we put it in the queue, and then if + // individual deliveries fail, we report via email. + // TODO: this should queue, not send, the message. + // TODO: trace this. + msgID, err := c.queue.Put(c.mail_from, c.rcpt_to, c.data) + if err != nil { + tr.LazyPrintf(" error queueing: %v", err) + tr.SetError() + return 554, fmt.Sprintf("Failed to enqueue message: %v", err) + } + + tr.LazyPrintf(" ... queued: %q", msgID) // It is very important that we reset the envelope before returning, // so clients can send other emails right away without needing to RSET. diff --git a/chasquid_test.go b/chasquid_test.go index 160024f..9676297 100644 --- a/chasquid_test.go +++ b/chasquid_test.go @@ -224,6 +224,9 @@ func BenchmarkManyEmails(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { sendEmail(b, c) + + // TODO: Make sendEmail() wait for delivery, and remove this. + time.Sleep(10 * time.Millisecond) } } @@ -234,6 +237,9 @@ func BenchmarkManyEmailsParallel(b *testing.B) { for pb.Next() { sendEmail(b, c) + + // TODO: Make sendEmail() wait for delivery, and remove this. + time.Sleep(100 * time.Millisecond) } }) } diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 0000000..538077b --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,174 @@ +// Package queue implements our email queue. +// Accepted envelopes get put in the queue, and processed asynchronously. +package queue + +import ( + "crypto/rand" + "encoding/base64" + "fmt" + "sync" + "time" + + "github.com/golang/glog" + "golang.org/x/net/trace" +) + +const ( + // Maximum size of the queue; we reject emails when we hit this. + maxQueueSize = 200 + + // Give up sending attempts after this duration. + giveUpAfter = 12 * time.Hour +) + +var ( + queueFullError = fmt.Errorf("Queue size too big, try again later") +) + +// Channel used to get random IDs for items in the queue. +var newID chan string + +func generateNewIDs() { + // IDs are base64(8 random bytes), but the code doesn't care. + var err error + buf := make([]byte, 8) + id := "" + for { + _, err = rand.Read(buf) + if err != nil { + panic(err) + } + + id = base64.RawURLEncoding.EncodeToString(buf) + newID <- id + } + +} + +func init() { + newID = make(chan string, 4) + go generateNewIDs() +} + +// Queue that keeps mail waiting for delivery. +type Queue struct { + // Items in the queue. Map of id -> Item. + q map[string]*Item + + // Mutex protecting q. + mu sync.RWMutex +} + +// TODO: Store the queue on disk. +// Load the queue and launch the sending loops on startup. +func New() *Queue { + return &Queue{ + q: map[string]*Item{}, + } +} + +// Put an envelope in the queue. +func (q *Queue) Put(from string, to []string, data []byte) (string, error) { + if len(q.q) >= maxQueueSize { + return "", queueFullError + } + + item := &Item{ + ID: <-newID, + From: from, + To: to, + Data: data, + Created: time.Now(), + Results: map[string]error{}, + } + q.mu.Lock() + q.q[item.ID] = item + q.mu.Unlock() + + glog.Infof("Queue accepted %s from %q", item.ID, from) + + // Begin to send it right away. + go item.SendLoop(q) + + return item.ID, nil +} + +// Remove an item from the queue. +func (q *Queue) Remove(id string) { + q.mu.Lock() + delete(q.q, id) + q.mu.Unlock() +} + +// TODO: http handler for dumping the queue. +// Register it in main(). + +// An item in the queue. +// This must be easily serializable, so no pointers. +type Item struct { + // Item ID. Uniquely identifies this item. + ID string + + // The envelope for this item. + From string + To []string + Data []byte + + // Creation time. + Created time.Time + + // Next attempt to send. + NextAttempt time.Time + + // Map of recipient -> last result of sending it. + Results map[string]error +} + +func (item *Item) SendLoop(q *Queue) { + defer q.Remove(item.ID) + + tr := trace.New("Queue", item.ID) + defer tr.Finish() + tr.LazyPrintf("from: %s", item.From) + + for time.Since(item.Created) < giveUpAfter { + // Send to all recipients that are still pending. + successful := 0 + for _, to := range item.To { + if err, ok := item.Results[to]; ok && err == nil { + // Successful send for this recipient, nothing to do. + successful++ + continue + } + + tr.LazyPrintf("%s sending", to) + glog.Infof("%s %q -> %q", item.ID, item.From, to) + + // TODO: deliver, serially or in parallel with a waitgroup. + // Fake a successful send for now. + item.Results[to] = nil + successful++ + + tr.LazyPrintf("%s successful", to) + } + + if successful == len(item.To) { + // Successfully sent to all recipients. + glog.Infof("%s all successful", item.ID) + return + } + + // TODO: Consider sending a non-final notification after 30m or so, + // that some of the messages have been delayed. + + // TODO: Next attempt incremental wrt. previous one. + // Do 3m, 5m, 10m, 15m, 40m, 60m, 2h, 5h, 12h, perturbed. + // Put a table and function below, to change this easily. + // We should track the duration of the previous one too? Or computed + // based on created? + time.Sleep(3 * time.Minute) + + } + + // TODO: Send a notification message for the recipients we failed to send. +} diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go new file mode 100644 index 0000000..06a9f9f --- /dev/null +++ b/internal/queue/queue_test.go @@ -0,0 +1,70 @@ +package queue + +import ( + "bytes" + "testing" + "time" +) + +func TestBasic(t *testing.T) { + q := New() + + id, err := q.Put("from", []string{"to"}, []byte("data")) + if err != nil { + t.Fatalf("Put: %v", err) + } + + if len(id) < 6 { + t.Errorf("short ID: %v", id) + } + + q.mu.RLock() + item := q.q[id] + q.mu.RUnlock() + + // TODO: There's a race because the item may finish the loop before we + // poll it from the queue, and we would get a nil item in that case. + // We have to live with this for now, and will close it later once we + // implement deliveries. + if item == nil { + t.Logf("hit item race, nothing else to do") + return + } + + if item.From != "from" || item.To[0] != "to" || + !bytes.Equal(item.Data, []byte("data")) { + t.Errorf("different item: %#v", item) + } +} + +func TestFullQueue(t *testing.T) { + q := New() + + // Force-insert maxQueueSize items in the queue. + oneID := "" + for i := 0; i < maxQueueSize; i++ { + item := &Item{ + ID: <-newID, + From: "from", + To: []string{"to"}, + Data: []byte("data"), + Created: time.Now(), + Results: map[string]error{}, + } + q.q[item.ID] = item + oneID = item.ID + } + + // This one should fail due to the queue being too big. + id, err := q.Put("from", []string{"to"}, []byte("data")) + if err != queueFullError { + t.Errorf("Not failed as expected: %v - %v", id, err) + } + + // Remove one, and try again: it should succeed. + q.Remove(oneID) + _, err = q.Put("from", []string{"to"}, []byte("data")) + if err != nil { + t.Errorf("Put: %v", err) + } +}