git » chasquid » commit 58de5a6

Add a skeleton queue

author Alberto Bertogli
2015-10-31 16:48:01 UTC
committer Alberto Bertogli
2015-11-06 10:27:05 UTC
parent 701f359634ee18563a80e33f8d99275d1ec140b2

Add a skeleton queue

This patch introduces a basic, in-memory queue that only holds emails for now.

This slows down the benchmarks because we don't yet have a way to wait for
delivery (even if fake), that will come in later patches.

chasquid.go +23 -1
chasquid_test.go +6 -0
internal/queue/queue.go +174 -0
internal/queue/queue_test.go +70 -0

diff --git a/chasquid.go b/chasquid.go
index 222a84c..98dade2 100644
--- a/chasquid.go
+++ b/chasquid.go
@@ -17,6 +17,7 @@ import (
 	"time"
 
 	"blitiri.com.ar/go/chasquid/internal/config"
+	"blitiri.com.ar/go/chasquid/internal/queue"
 	"blitiri.com.ar/go/chasquid/internal/systemd"
 
 	_ "net/http/pprof"
@@ -119,12 +120,16 @@ type Server struct {
 
 	// Time we wait for command round-trips (excluding DATA).
 	commandTimeout time.Duration
+
+	// Queue where we put incoming mail.
+	queue *queue.Queue
 }
 
 func NewServer() *Server {
 	return &Server{
 		connTimeout:    20 * time.Minute,
 		commandTimeout: 1 * time.Minute,
+		queue:          queue.New(),
 	}
 }
 
@@ -211,6 +216,7 @@ func (s *Server) serve(l net.Listener) {
 			tlsConfig:      s.tlsConfig,
 			deadline:       time.Now().Add(s.connTimeout),
 			commandTimeout: s.commandTimeout,
+			queue:          s.queue,
 		}
 		go sc.Handle()
 	}
@@ -244,6 +250,9 @@ type Conn struct {
 	// When we should close this connection, no matter what.
 	deadline time.Time
 
+	// Queue where we put incoming mails.
+	queue *queue.Queue
+
 	// Time we wait for network operations.
 	commandTimeout time.Duration
 }
@@ -321,6 +330,7 @@ loop:
 
 	if err != nil {
 		tr.LazyPrintf("exiting with error: %v", err)
+		tr.SetError()
 	}
 }
 
@@ -396,8 +406,8 @@ func (c *Conn) MAIL(params string) (code int, msg string) {
 
 	// Note some servers check (and fail) if we had a previous MAIL command,
 	// but that's not according to the RFC. We reset the envelope instead.
-
 	c.resetEnvelope()
+
 	c.mail_from = e.Address
 	return 250, "You feel like you are being watched"
 }
@@ -463,6 +473,18 @@ func (c *Conn) DATA(params string, tr trace.Trace) (code int, msg string) {
 	tr.LazyPrintf("-> ... %d bytes of data", len(c.data))
 
 	// TODO: here is where we queue/send/process the message!
+	// There are no partial failures here: we put it in the queue, and then if
+	// individual deliveries fail, we report via email.
+	// TODO: this should queue, not send, the message.
+	// TODO: trace this.
+	msgID, err := c.queue.Put(c.mail_from, c.rcpt_to, c.data)
+	if err != nil {
+		tr.LazyPrintf("   error queueing: %v", err)
+		tr.SetError()
+		return 554, fmt.Sprintf("Failed to enqueue message: %v", err)
+	}
+
+	tr.LazyPrintf("   ... queued: %q", msgID)
 
 	// It is very important that we reset the envelope before returning,
 	// so clients can send other emails right away without needing to RSET.
diff --git a/chasquid_test.go b/chasquid_test.go
index 160024f..9676297 100644
--- a/chasquid_test.go
+++ b/chasquid_test.go
@@ -224,6 +224,9 @@ func BenchmarkManyEmails(b *testing.B) {
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
 		sendEmail(b, c)
+
+		// TODO: Make sendEmail() wait for delivery, and remove this.
+		time.Sleep(10 * time.Millisecond)
 	}
 }
 
@@ -234,6 +237,9 @@ func BenchmarkManyEmailsParallel(b *testing.B) {
 
 		for pb.Next() {
 			sendEmail(b, c)
+
+			// TODO: Make sendEmail() wait for delivery, and remove this.
+			time.Sleep(100 * time.Millisecond)
 		}
 	})
 }
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
new file mode 100644
index 0000000..538077b
--- /dev/null
+++ b/internal/queue/queue.go
@@ -0,0 +1,174 @@
+// Package queue implements our email queue.
+// Accepted envelopes get put in the queue, and processed asynchronously.
+package queue
+
+import (
+	"crypto/rand"
+	"encoding/base64"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/golang/glog"
+	"golang.org/x/net/trace"
+)
+
+const (
+	// Maximum size of the queue; we reject emails when we hit this.
+	maxQueueSize = 200
+
+	// Give up sending attempts after this duration.
+	giveUpAfter = 12 * time.Hour
+)
+
+var (
+	queueFullError = fmt.Errorf("Queue size too big, try again later")
+)
+
+// Channel used to get random IDs for items in the queue.
+var newID chan string
+
+func generateNewIDs() {
+	// IDs are base64(8 random bytes), but the code doesn't care.
+	var err error
+	buf := make([]byte, 8)
+	id := ""
+	for {
+		_, err = rand.Read(buf)
+		if err != nil {
+			panic(err)
+		}
+
+		id = base64.RawURLEncoding.EncodeToString(buf)
+		newID <- id
+	}
+
+}
+
+func init() {
+	newID = make(chan string, 4)
+	go generateNewIDs()
+}
+
+// Queue that keeps mail waiting for delivery.
+type Queue struct {
+	// Items in the queue. Map of id -> Item.
+	q map[string]*Item
+
+	// Mutex protecting q.
+	mu sync.RWMutex
+}
+
+// TODO: Store the queue on disk.
+// Load the queue and launch the sending loops on startup.
+func New() *Queue {
+	return &Queue{
+		q: map[string]*Item{},
+	}
+}
+
+// Put an envelope in the queue.
+func (q *Queue) Put(from string, to []string, data []byte) (string, error) {
+	if len(q.q) >= maxQueueSize {
+		return "", queueFullError
+	}
+
+	item := &Item{
+		ID:      <-newID,
+		From:    from,
+		To:      to,
+		Data:    data,
+		Created: time.Now(),
+		Results: map[string]error{},
+	}
+	q.mu.Lock()
+	q.q[item.ID] = item
+	q.mu.Unlock()
+
+	glog.Infof("Queue accepted %s  from %q", item.ID, from)
+
+	// Begin to send it right away.
+	go item.SendLoop(q)
+
+	return item.ID, nil
+}
+
+// Remove an item from the queue.
+func (q *Queue) Remove(id string) {
+	q.mu.Lock()
+	delete(q.q, id)
+	q.mu.Unlock()
+}
+
+// TODO: http handler for dumping the queue.
+// Register it in main().
+
+// An item in the queue.
+// This must be easily serializable, so no pointers.
+type Item struct {
+	// Item ID. Uniquely identifies this item.
+	ID string
+
+	// The envelope for this item.
+	From string
+	To   []string
+	Data []byte
+
+	// Creation time.
+	Created time.Time
+
+	// Next attempt to send.
+	NextAttempt time.Time
+
+	// Map of recipient -> last result of sending it.
+	Results map[string]error
+}
+
+func (item *Item) SendLoop(q *Queue) {
+	defer q.Remove(item.ID)
+
+	tr := trace.New("Queue", item.ID)
+	defer tr.Finish()
+	tr.LazyPrintf("from: %s", item.From)
+
+	for time.Since(item.Created) < giveUpAfter {
+		// Send to all recipients that are still pending.
+		successful := 0
+		for _, to := range item.To {
+			if err, ok := item.Results[to]; ok && err == nil {
+				// Successful send for this recipient, nothing to do.
+				successful++
+				continue
+			}
+
+			tr.LazyPrintf("%s sending", to)
+			glog.Infof("%s %q -> %q", item.ID, item.From, to)
+
+			// TODO: deliver, serially or in parallel with a waitgroup.
+			// Fake a successful send for now.
+			item.Results[to] = nil
+			successful++
+
+			tr.LazyPrintf("%s successful", to)
+		}
+
+		if successful == len(item.To) {
+			// Successfully sent to all recipients.
+			glog.Infof("%s all successful", item.ID)
+			return
+		}
+
+		// TODO: Consider sending a non-final notification after 30m or so,
+		// that some of the messages have been delayed.
+
+		// TODO: Next attempt incremental wrt. previous one.
+		// Do 3m, 5m, 10m, 15m, 40m, 60m, 2h, 5h, 12h, perturbed.
+		// Put a table and function below, to change this easily.
+		// We should track the duration of the previous one too? Or computed
+		// based on created?
+		time.Sleep(3 * time.Minute)
+
+	}
+
+	// TODO: Send a notification message for the recipients we failed to send.
+}
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
new file mode 100644
index 0000000..06a9f9f
--- /dev/null
+++ b/internal/queue/queue_test.go
@@ -0,0 +1,70 @@
+package queue
+
+import (
+	"bytes"
+	"testing"
+	"time"
+)
+
+func TestBasic(t *testing.T) {
+	q := New()
+
+	id, err := q.Put("from", []string{"to"}, []byte("data"))
+	if err != nil {
+		t.Fatalf("Put: %v", err)
+	}
+
+	if len(id) < 6 {
+		t.Errorf("short ID: %v", id)
+	}
+
+	q.mu.RLock()
+	item := q.q[id]
+	q.mu.RUnlock()
+
+	// TODO: There's a race because the item may finish the loop before we
+	// poll it from the queue, and we would get a nil item in that case.
+	// We have to live with this for now, and will close it later once we
+	// implement deliveries.
+	if item == nil {
+		t.Logf("hit item race, nothing else to do")
+		return
+	}
+
+	if item.From != "from" || item.To[0] != "to" ||
+		!bytes.Equal(item.Data, []byte("data")) {
+		t.Errorf("different item: %#v", item)
+	}
+}
+
+func TestFullQueue(t *testing.T) {
+	q := New()
+
+	// Force-insert maxQueueSize items in the queue.
+	oneID := ""
+	for i := 0; i < maxQueueSize; i++ {
+		item := &Item{
+			ID:      <-newID,
+			From:    "from",
+			To:      []string{"to"},
+			Data:    []byte("data"),
+			Created: time.Now(),
+			Results: map[string]error{},
+		}
+		q.q[item.ID] = item
+		oneID = item.ID
+	}
+
+	// This one should fail due to the queue being too big.
+	id, err := q.Put("from", []string{"to"}, []byte("data"))
+	if err != queueFullError {
+		t.Errorf("Not failed as expected: %v - %v", id, err)
+	}
+
+	// Remove one, and try again: it should succeed.
+	q.Remove(oneID)
+	_, err = q.Put("from", []string{"to"}, []byte("data"))
+	if err != nil {
+		t.Errorf("Put: %v", err)
+	}
+}