git » chasquid » commit 1d3675a

queue: Send delivery status notifications on failures

author Alberto Bertogli
2016-09-25 14:50:17 UTC
committer Alberto Bertogli
2016-10-09 23:51:04 UTC
parent 927a74aa3cf00d7524a8c1688ec30722d74c697b

queue: Send delivery status notifications on failures

When we permanently failed to deliver to one or more recipients, send delivery
status notifications back to the sender.

To do this, we need to extend a couple of internal structures, to keep track
of the original destinations (so we can include them in the message, for
reference), and the hostname we're identifying ourselves as (this is arguable
but we're going with it for now, may change later).

chasquid.go +1 -1
internal/config/config.pb.go +5 -2
internal/config/config.proto +5 -2
internal/queue/dsn.go +80 -0
internal/queue/dsn_test.go +27 -0
internal/queue/queue.go +48 -15
internal/queue/queue.pb.go +35 -28
internal/queue/queue.proto +9 -3
internal/queue/queue_test.go +14 -13
test/t-03-queue_persistency/addtoqueue.go +2 -1

diff --git a/chasquid.go b/chasquid.go
index ff33a0a..adbfae6 100644
--- a/chasquid.go
+++ b/chasquid.go
@@ -680,7 +680,7 @@ func (c *Conn) DATA(params string, tr *trace.Trace) (code int, msg string) {
 
 	// There are no partial failures here: we put it in the queue, and then if
 	// individual deliveries fail, we report via email.
-	msgID, err := c.queue.Put(c.mailFrom, c.rcptTo, c.data)
+	msgID, err := c.queue.Put(c.hostname, c.mailFrom, c.rcptTo, c.data)
 	if err != nil {
 		tr.LazyPrintf("   error queueing: %v", err)
 		tr.SetError()
diff --git a/internal/config/config.pb.go b/internal/config/config.pb.go
index 76976d8..e37e6c0 100644
--- a/internal/config/config.pb.go
+++ b/internal/config/config.pb.go
@@ -29,8 +29,11 @@ var _ = math.Inf
 const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
 
 type Config struct {
-	// Hostname to use when we say hello.
-	// For aesthetic purposes, but may help if our ip address resolves to it.
+	// Main/default hostname to use.
+	// This is used to say hello to clients, and by default as the domain
+	// we send delivery notifications errors from.
+	// It should be a domain we can send email from.
+	// It usually helps if our IP address resolves to it.
 	// Default: machine hostname.
 	Hostname string `protobuf:"bytes,1,opt,name=hostname" json:"hostname,omitempty"`
 	// Maximum email size, in megabytes.
diff --git a/internal/config/config.proto b/internal/config/config.proto
index b68c159..b57e335 100644
--- a/internal/config/config.proto
+++ b/internal/config/config.proto
@@ -2,8 +2,11 @@
 syntax = "proto3";
 
 message Config {
-	// Hostname to use when we say hello.
-	// For aesthetic purposes, but may help if our ip address resolves to it.
+	// Main/default hostname to use.
+	// This is used to say hello to clients, and by default as the domain
+	// we send delivery notifications errors from.
+	// It should be a domain we can send email from.
+	// It usually helps if our IP address resolves to it.
 	// Default: machine hostname.
 	string hostname = 1;
 
diff --git a/internal/queue/dsn.go b/internal/queue/dsn.go
new file mode 100644
index 0000000..0300b3e
--- /dev/null
+++ b/internal/queue/dsn.go
@@ -0,0 +1,80 @@
+package queue
+
+import (
+	"bytes"
+	"text/template"
+	"time"
+)
+
+// Maximum length of the original message to include in the DSN.
+const maxOrigMsgLen = 256 * 1024
+
+// deliveryStatusNotification creates a delivery status notification (DSN) for
+// the given item, and puts it in the queue.
+//
+// There is a standard, https://tools.ietf.org/html/rfc3464, although most
+// MTAs seem to use a plain email and include an X-Failed-Recipients header.
+// We're going with the latter for now, may extend it to the former later.
+func deliveryStatusNotification(item *Item) ([]byte, error) {
+	info := dsnInfo{
+		OurDomain:   item.Hostname,
+		Destination: item.From,
+		MessageID:   "chasquid-dsn-" + <-newID + "@" + item.Hostname,
+		Date:        time.Now().Format(time.RFC1123Z),
+		To:          item.To,
+		Recipients:  item.Rcpt,
+	}
+
+	for _, rcpt := range item.Rcpt {
+		if rcpt.Status == Recipient_FAILED {
+			info.FailedRcpts = append(info.FailedRcpts, rcpt)
+		}
+	}
+
+	if len(item.Data) > maxOrigMsgLen {
+		info.OriginalMessage = string(item.Data[:maxOrigMsgLen])
+	} else {
+		info.OriginalMessage = string(item.Data)
+	}
+
+	buf := &bytes.Buffer{}
+	err := dsnTemplate.Execute(buf, info)
+	return buf.Bytes(), err
+}
+
+type dsnInfo struct {
+	OurDomain       string
+	Destination     string
+	MessageID       string
+	Date            string
+	To              []string
+	Recipients      []*Recipient
+	FailedRcpts     []*Recipient
+	OriginalMessage string
+}
+
+var dsnTemplate = template.Must(template.New("dsn").Parse(
+	`From: Mail Delivery System <postmaster-dsn@{{.OurDomain}}>
+To: <{{.Destination}}>
+Subject: Mail delivery failed: returning message to sender
+Message-ID: <{{.MessageID}}>
+Date: {{.Date}}
+X-Failed-Recipients: {{range .To}}{{.}}, {{end}}
+Auto-Submitted: auto-replied
+
+Delivery to the following recipient(s) failed permanently:
+
+  {{range .To -}} - {{.}}
+  {{end}}
+
+----- Technical details -----
+{{range .Recipients}}
+- "{{.Address}}" ({{.Type}}) failed with error:
+    {{.LastFailureMessage}}
+{{end}}
+
+----- Original message -----
+
+{{.OriginalMessage}}
+
+`))
diff --git a/internal/queue/dsn_test.go b/internal/queue/dsn_test.go
new file mode 100644
index 0000000..8c3df7e
--- /dev/null
+++ b/internal/queue/dsn_test.go
@@ -0,0 +1,27 @@
+package queue
+
+import "testing"
+
+func TestDSN(t *testing.T) {
+	item := &Item{
+		Message: Message{
+			ID:   <-newID,
+			From: "from@from.org",
+			To:   []string{"toto@africa.org", "negra@sosa.org"},
+			Rcpt: []*Recipient{
+				{"poe@rcpt", Recipient_EMAIL, Recipient_FAILED,
+					"oh! horror!"},
+				{"newman@rcpt", Recipient_EMAIL, Recipient_FAILED,
+					"oh! the humanity!"}},
+			Data:     []byte("data ñaca"),
+			Hostname: "from.org",
+		},
+	}
+
+	msg, err := deliveryStatusNotification(item)
+	if err != nil {
+		t.Error(err)
+	}
+
+	t.Log(string(msg))
+}
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 3c21212..865722e 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -143,21 +143,24 @@ func (q *Queue) Len() int {
 }
 
 // Put an envelope in the queue.
-func (q *Queue) Put(from string, to []string, data []byte) (string, error) {
+func (q *Queue) Put(hostname, from string, to []string, data []byte) (string, error) {
 	if q.Len() >= maxQueueSize {
 		return "", errQueueFull
 	}
 
 	item := &Item{
 		Message: Message{
-			ID:   <-newID,
-			From: from,
-			Data: data,
+			ID:       <-newID,
+			From:     from,
+			Data:     data,
+			Hostname: hostname,
 		},
 		CreatedAt: time.Now(),
 	}
 
 	for _, t := range to {
+		item.To = append(item.To, t)
+
 		rcpts, err := q.aliases.Resolve(t)
 		if err != nil {
 			return "", fmt.Errorf("error resolving aliases for %q: %v", t, err)
@@ -193,7 +196,7 @@ func (q *Queue) Put(from string, to []string, data []byte) (string, error) {
 	q.q[item.ID] = item
 	q.mu.Unlock()
 
-	glog.Infof("%s accepted from %q", item.ID, from)
+	glog.Infof("%s accepted from %q  to  %v", item.ID, from, to)
 
 	// Begin to send it right away.
 	go item.SendLoop(q)
@@ -258,7 +261,6 @@ func (item *Item) WriteTo(dir string) error {
 }
 
 func (item *Item) SendLoop(q *Queue) {
-
 	tr := trace.New("Queue", item.ID)
 	defer tr.Finish()
 	tr.LazyPrintf("from: %s", item.From)
@@ -304,6 +306,9 @@ func (item *Item) SendLoop(q *Queue) {
 				if oldStatus != status {
 					item.Lock()
 					rcpt.Status = status
+					if err != nil {
+						rcpt.LastFailureMessage = err.Error()
+					}
 					item.Unlock()
 
 					err = item.WriteTo(q.path)
@@ -316,20 +321,15 @@ func (item *Item) SendLoop(q *Queue) {
 		}
 		wg.Wait()
 
+		// If they're all done, no need to wait.
 		pending := 0
 		for _, rcpt := range item.Rcpt {
 			if rcpt.Status == Recipient_PENDING {
 				pending++
 			}
 		}
-
 		if pending == 0 {
-			// Completed to all recipients (some may not have succeeded).
-			tr.LazyPrintf("all done")
-			glog.Infof("%s all done", item.ID)
-
-			q.Remove(item.ID)
-			return
+			break
 		}
 
 		// TODO: Consider sending a non-final notification after 30m or so,
@@ -341,8 +341,41 @@ func (item *Item) SendLoop(q *Queue) {
 		time.Sleep(delay)
 	}
 
-	// TODO: Send a notification message for the recipients we failed to send,
-	// remove item from the queue, and remove from disk.
+	// Completed to all recipients (some may not have succeeded).
+	tr.LazyPrintf("all done")
+	glog.Infof("%s all done", item.ID)
+
+	failed := 0
+	for _, rcpt := range item.Rcpt {
+		if rcpt.Status == Recipient_FAILED {
+			failed++
+		}
+	}
+
+	if failed > 0 && item.From != "<>" {
+		sendDSN(tr, q, item)
+	}
+
+	q.Remove(item.ID)
+
+	return
+}
+
+func sendDSN(tr trace.Trace, q *Queue, item *Item) {
+	tr.LazyPrintf("sending DSN")
+
+	msg, err := deliveryStatusNotification(item)
+	if err != nil {
+		tr.LazyPrintf("failed to build DSN: %v", err)
+		glog.Infof("%s: failed to build DSN: %v", item.ID, err)
+		return
+	}
+
+	_, err = q.Put(item.Hostname, "<>", []string{item.From}, msg)
+	if err != nil {
+		tr.LazyPrintf("failed to queue DSN: %v", err)
+		glog.Infof("%s: failed to queue DSN: %v", item.ID, err)
+	}
 }
 
 // deliver the item to the given recipient, using the couriers from the queue.
diff --git a/internal/queue/queue.pb.go b/internal/queue/queue.pb.go
index 0c7e655..9140e56 100644
--- a/internal/queue/queue.pb.go
+++ b/internal/queue/queue.pb.go
@@ -81,10 +81,13 @@ type Message struct {
 	ID string `protobuf:"bytes,1,opt,name=ID,json=iD" json:"ID,omitempty"`
 	// The envelope for this message.
 	From string       `protobuf:"bytes,2,opt,name=from" json:"from,omitempty"`
-	Rcpt []*Recipient `protobuf:"bytes,3,rep,name=rcpt" json:"rcpt,omitempty"`
-	Data []byte       `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
+	To   []string     `protobuf:"bytes,3,rep,name=To,json=to" json:"To,omitempty"`
+	Rcpt []*Recipient `protobuf:"bytes,4,rep,name=rcpt" json:"rcpt,omitempty"`
+	Data []byte       `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"`
 	// Creation timestamp.
-	CreatedAtTs *google_protobuf.Timestamp `protobuf:"bytes,5,opt,name=created_at_ts,json=createdAtTs" json:"created_at_ts,omitempty"`
+	CreatedAtTs *google_protobuf.Timestamp `protobuf:"bytes,6,opt,name=created_at_ts,json=createdAtTs" json:"created_at_ts,omitempty"`
+	// Hostname of the server receiving this message.
+	Hostname string `protobuf:"bytes,7,opt,name=hostname" json:"hostname,omitempty"`
 }
 
 func (m *Message) Reset()                    { *m = Message{} }
@@ -107,9 +110,10 @@ func (m *Message) GetCreatedAtTs() *google_protobuf.Timestamp {
 }
 
 type Recipient struct {
-	Address string           `protobuf:"bytes,1,opt,name=address" json:"address,omitempty"`
-	Type    Recipient_Type   `protobuf:"varint,2,opt,name=type,enum=queue.Recipient_Type" json:"type,omitempty"`
-	Status  Recipient_Status `protobuf:"varint,3,opt,name=status,enum=queue.Recipient_Status" json:"status,omitempty"`
+	Address            string           `protobuf:"bytes,1,opt,name=address" json:"address,omitempty"`
+	Type               Recipient_Type   `protobuf:"varint,2,opt,name=type,enum=queue.Recipient_Type" json:"type,omitempty"`
+	Status             Recipient_Status `protobuf:"varint,3,opt,name=status,enum=queue.Recipient_Status" json:"status,omitempty"`
+	LastFailureMessage string           `protobuf:"bytes,4,opt,name=last_failure_message,json=lastFailureMessage" json:"last_failure_message,omitempty"`
 }
 
 func (m *Recipient) Reset()                    { *m = Recipient{} }
@@ -127,26 +131,29 @@ func init() {
 func init() { proto.RegisterFile("queue.proto", fileDescriptor0) }
 
 var fileDescriptor0 = []byte{
-	// 332 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x8f, 0x51, 0x4b, 0xc3, 0x30,
-	0x14, 0x85, 0x6d, 0xd7, 0x75, 0xee, 0x56, 0x47, 0x09, 0x88, 0x65, 0xbe, 0x8c, 0xe2, 0xc3, 0x44,
-	0x68, 0x61, 0x3e, 0x0a, 0xc2, 0xa0, 0x55, 0x0a, 0x6e, 0x8c, 0xac, 0xef, 0x23, 0x6d, 0xb3, 0x5a,
-	0x58, 0x97, 0xda, 0xa4, 0x0f, 0xfe, 0x22, 0x7f, 0x8c, 0x7f, 0xca, 0x34, 0xed, 0x14, 0xf4, 0xed,
-	0xde, 0x9c, 0x73, 0x72, 0xbf, 0x03, 0xd6, 0x7b, 0x43, 0x1b, 0xea, 0x55, 0x35, 0x13, 0x0c, 0x0d,
-	0xd5, 0x32, 0x7d, 0xcc, 0x0b, 0xf1, 0xd6, 0x24, 0x5e, 0xca, 0x4a, 0x3f, 0x67, 0x07, 0x72, 0xcc,
-	0x7d, 0xa5, 0x27, 0xcd, 0xde, 0xaf, 0xc4, 0x47, 0x45, 0xb9, 0x2f, 0x8a, 0x92, 0x72, 0x41, 0xca,
-	0xea, 0x77, 0xea, 0xfe, 0x70, 0x3f, 0x35, 0x18, 0xad, 0x28, 0xe7, 0x24, 0xa7, 0x68, 0x02, 0x7a,
-	0x14, 0x38, 0xda, 0x4c, 0x9b, 0x8f, 0xb1, 0x5e, 0x04, 0x08, 0x81, 0xb1, 0xaf, 0x59, 0xe9, 0xe8,
-	0xea, 0x45, 0xcd, 0xe8, 0x16, 0x8c, 0x3a, 0xad, 0x84, 0x33, 0x98, 0x0d, 0xe6, 0xd6, 0xc2, 0xf6,
-	0x3a, 0x1e, 0x4c, 0xd3, 0xa2, 0x2a, 0xe8, 0x51, 0x60, 0xa5, 0xb6, 0xc9, 0x8c, 0x08, 0xe2, 0x18,
-	0x32, 0x79, 0x81, 0xd5, 0x8c, 0x9e, 0xe0, 0x32, 0xad, 0x29, 0x11, 0x34, 0xdb, 0x11, 0xb1, 0x13,
-	0xdc, 0x19, 0x4a, 0xd1, 0x5a, 0x4c, 0xbd, 0x9c, 0xb1, 0xfc, 0xd0, 0x77, 0x92, 0xcc, 0x5e, 0x7c,
-	0x42, 0xc4, 0x56, 0x1f, 0x58, 0x8a, 0x98, 0xbb, 0x5f, 0x1a, 0x8c, 0x7f, 0xee, 0x20, 0x07, 0x46,
-	0x24, 0xcb, 0x6a, 0x49, 0xde, 0x03, 0x9f, 0x56, 0x74, 0x07, 0x46, 0x5b, 0x5a, 0x51, 0x4f, 0x16,
-	0x57, 0x7f, 0x09, 0xbd, 0x58, 0x8a, 0x58, 0x59, 0x90, 0x0f, 0xa6, 0x3c, 0x24, 0x1a, 0x2e, 0xeb,
-	0xb4, 0xe6, 0xeb, 0x7f, 0xe6, 0xad, 0x92, 0x71, 0x6f, 0x73, 0x6f, 0xc0, 0x68, 0xe3, 0x68, 0x0c,
-	0xc3, 0x70, 0xb5, 0x8c, 0x5e, 0xed, 0x33, 0x74, 0x0e, 0xc6, 0x26, 0xda, 0x84, 0xb6, 0xe6, 0xde,
-	0x83, 0xd9, 0xd9, 0x91, 0x05, 0xa3, 0x4d, 0xb8, 0x0e, 0xa2, 0xf5, 0x4b, 0x67, 0xd8, 0x86, 0xeb,
-	0xd8, 0xd6, 0x10, 0x80, 0xf9, 0x2c, 0x43, 0x61, 0x60, 0xeb, 0x89, 0xa9, 0xea, 0x3e, 0x7c, 0x07,
-	0x00, 0x00, 0xff, 0xff, 0xd1, 0x22, 0xa1, 0x0c, 0xd1, 0x01, 0x00, 0x00,
+	// 382 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x50, 0x41, 0xcb, 0x9b, 0x40,
+	0x10, 0xad, 0x7e, 0x46, 0x93, 0xb1, 0x0d, 0xb2, 0xb4, 0x74, 0x49, 0x2f, 0x41, 0x7a, 0x68, 0x29,
+	0x68, 0x49, 0x8f, 0x85, 0x42, 0x40, 0x53, 0x84, 0x26, 0x04, 0xe3, 0x5d, 0x36, 0xba, 0x31, 0x82,
+	0x66, 0xad, 0xbb, 0x1e, 0xfa, 0x3b, 0xfb, 0x7b, 0x0a, 0xdd, 0x5d, 0x4d, 0x0a, 0xfd, 0x6e, 0x33,
+	0xf3, 0xde, 0xcc, 0xbc, 0xf7, 0xc0, 0xfd, 0x39, 0xd0, 0x81, 0x06, 0x5d, 0xcf, 0x04, 0x43, 0x33,
+	0xdd, 0xac, 0xbe, 0x56, 0xb5, 0xb8, 0x0e, 0xe7, 0xa0, 0x60, 0x6d, 0x58, 0xb1, 0x86, 0xdc, 0xaa,
+	0x50, 0xe3, 0xe7, 0xe1, 0x12, 0x76, 0xe2, 0x57, 0x47, 0x79, 0x28, 0xea, 0x96, 0x72, 0x41, 0xda,
+	0xee, 0x5f, 0x35, 0xde, 0xf0, 0x7f, 0x1b, 0xe0, 0xec, 0x29, 0xe7, 0xa4, 0xa2, 0x68, 0x09, 0x66,
+	0x12, 0x61, 0x63, 0x6d, 0x7c, 0x58, 0xa4, 0x66, 0x1d, 0x21, 0x04, 0xd6, 0xa5, 0x67, 0x2d, 0x36,
+	0xf5, 0x44, 0xd7, 0x8a, 0x93, 0x31, 0xfc, 0xb4, 0x7e, 0x52, 0x1c, 0xa9, 0xe1, 0x3d, 0x58, 0x7d,
+	0xd1, 0x09, 0x6c, 0xc9, 0x89, 0xbb, 0xf1, 0x82, 0x51, 0x5f, 0x4a, 0x8b, 0xba, 0xab, 0xe9, 0x4d,
+	0xa4, 0x1a, 0x55, 0x97, 0x4a, 0x22, 0x08, 0x9e, 0xc9, 0x4b, 0x2f, 0x53, 0x5d, 0xa3, 0x6f, 0xf0,
+	0xaa, 0xe8, 0x29, 0x11, 0xb4, 0xcc, 0x89, 0xc8, 0x05, 0xc7, 0xb6, 0x04, 0xdd, 0xcd, 0x2a, 0xa8,
+	0x18, 0xab, 0x9a, 0xc9, 0xa3, 0xf4, 0x10, 0x64, 0x77, 0xc9, 0xa9, 0x3b, 0x2d, 0x6c, 0x45, 0xc6,
+	0xd1, 0x0a, 0xe6, 0x57, 0xc6, 0xc5, 0x8d, 0xb4, 0x14, 0x3b, 0x5a, 0xe1, 0xa3, 0xf7, 0xff, 0x18,
+	0xb0, 0x78, 0x68, 0x40, 0x18, 0x1c, 0x52, 0x96, 0xbd, 0x74, 0x39, 0x99, 0xbb, 0xb7, 0xe8, 0x23,
+	0x58, 0x2a, 0x20, 0xed, 0x70, 0xb9, 0x79, 0xf3, 0xbf, 0xfa, 0x20, 0x93, 0x60, 0xaa, 0x29, 0x28,
+	0x04, 0x5b, 0x8a, 0x10, 0x03, 0x97, 0xe6, 0x15, 0xf9, 0xed, 0x33, 0xf2, 0x49, 0xc3, 0xe9, 0x44,
+	0x43, 0x9f, 0xe1, 0x75, 0x43, 0xb8, 0xc8, 0x2f, 0xa4, 0x6e, 0x86, 0x9e, 0xe6, 0xed, 0x98, 0xb2,
+	0x4c, 0x4a, 0x49, 0x40, 0x0a, 0xdb, 0x8d, 0xd0, 0x94, 0xbf, 0xff, 0x0e, 0x2c, 0xf5, 0x10, 0x2d,
+	0x60, 0x16, 0xef, 0xb7, 0xc9, 0x0f, 0xef, 0x05, 0x9a, 0x83, 0x75, 0x4c, 0x8e, 0xb1, 0x67, 0xf8,
+	0x9f, 0xc0, 0x1e, 0x1f, 0x20, 0x17, 0x9c, 0x63, 0x7c, 0x88, 0x92, 0xc3, 0xf7, 0x91, 0x70, 0x8a,
+	0x0f, 0x99, 0x67, 0x20, 0x00, 0x7b, 0x27, 0x97, 0xe2, 0xc8, 0x33, 0xcf, 0xb6, 0x0e, 0xef, 0xcb,
+	0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x32, 0x94, 0x20, 0x2f, 0x02, 0x00, 0x00,
 }
diff --git a/internal/queue/queue.proto b/internal/queue/queue.proto
index 4be4a32..f821e62 100644
--- a/internal/queue/queue.proto
+++ b/internal/queue/queue.proto
@@ -13,11 +13,15 @@ message Message {
 
 	// The envelope for this message.
 	string from = 2;
-	repeated Recipient rcpt = 3;
-	bytes data = 4;
+	repeated string To = 3;
+	repeated Recipient rcpt = 4;
+	bytes data = 5;
 
 	// Creation timestamp.
-	google.protobuf.Timestamp created_at_ts = 5;
+	google.protobuf.Timestamp created_at_ts = 6;
+
+	// Hostname of the server receiving this message.
+	string hostname = 7;
 }
 
 message Recipient {
@@ -35,5 +39,7 @@ message Recipient {
 		FAILED = 2;
 	}
 	Status status = 3;
+
+	string last_failure_message = 4;
 }
 
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index a750849..5e8262e 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -65,7 +65,7 @@ func TestBasic(t *testing.T) {
 
 	localC.wg.Add(2)
 	remoteC.wg.Add(1)
-	id, err := q.Put("from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data"))
+	id, err := q.Put("host", "from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data"))
 	if err != nil {
 		t.Fatalf("Put: %v", err)
 	}
@@ -110,7 +110,8 @@ func TestFullQueue(t *testing.T) {
 			Message: Message{
 				ID:   <-newID,
 				From: fmt.Sprintf("from-%d", i),
-				Rcpt: []*Recipient{{"to", Recipient_EMAIL, Recipient_PENDING}},
+				Rcpt: []*Recipient{
+					{"to", Recipient_EMAIL, Recipient_PENDING, ""}},
 				Data: []byte("data"),
 			},
 			CreatedAt: time.Now(),
@@ -120,7 +121,7 @@ func TestFullQueue(t *testing.T) {
 	}
 
 	// This one should fail due to the queue being too big.
-	id, err := q.Put("from", []string{"to"}, []byte("data-qf"))
+	id, err := q.Put("host", "from", []string{"to"}, []byte("data-qf"))
 	if err != errQueueFull {
 		t.Errorf("Not failed as expected: %v - %v", id, err)
 	}
@@ -131,7 +132,7 @@ func TestFullQueue(t *testing.T) {
 	q.q[oneID].WriteTo(q.path)
 	q.Remove(oneID)
 
-	id, err = q.Put("from", []string{"to"}, []byte("data"))
+	id, err = q.Put("host", "from", []string{"to"}, []byte("data"))
 	if err != nil {
 		t.Errorf("Put: %v", err)
 	}
@@ -162,17 +163,17 @@ func TestAliases(t *testing.T) {
 		expected []*Recipient
 	}{
 		{[]string{"ab@loco"}, []*Recipient{
-			{"pq@loco", Recipient_EMAIL, Recipient_PENDING},
-			{"rs@loco", Recipient_EMAIL, Recipient_PENDING},
-			{"command", Recipient_PIPE, Recipient_PENDING}}},
+			{"pq@loco", Recipient_EMAIL, Recipient_PENDING, ""},
+			{"rs@loco", Recipient_EMAIL, Recipient_PENDING, ""},
+			{"command", Recipient_PIPE, Recipient_PENDING, ""}}},
 		{[]string{"ab@loco", "cd@loco"}, []*Recipient{
-			{"pq@loco", Recipient_EMAIL, Recipient_PENDING},
-			{"rs@loco", Recipient_EMAIL, Recipient_PENDING},
-			{"command", Recipient_PIPE, Recipient_PENDING},
-			{"ata@hualpa", Recipient_EMAIL, Recipient_PENDING}}},
+			{"pq@loco", Recipient_EMAIL, Recipient_PENDING, ""},
+			{"rs@loco", Recipient_EMAIL, Recipient_PENDING, ""},
+			{"command", Recipient_PIPE, Recipient_PENDING, ""},
+			{"ata@hualpa", Recipient_EMAIL, Recipient_PENDING, ""}}},
 	}
 	for _, c := range cases {
-		id, err := q.Put("from", c.to, []byte("data"))
+		id, err := q.Put("host", "from", c.to, []byte("data"))
 		if err != nil {
 			t.Errorf("Put: %v", err)
 		}
@@ -192,7 +193,7 @@ func TestPipes(t *testing.T) {
 			ID:   <-newID,
 			From: "from",
 			Rcpt: []*Recipient{
-				{"true", Recipient_PIPE, Recipient_PENDING}},
+				{"true", Recipient_PIPE, Recipient_PENDING, ""}},
 			Data: []byte("data"),
 		},
 		CreatedAt: time.Now(),
diff --git a/test/t-03-queue_persistency/addtoqueue.go b/test/t-03-queue_persistency/addtoqueue.go
index bfe7af8..03b2aab 100644
--- a/test/t-03-queue_persistency/addtoqueue.go
+++ b/test/t-03-queue_persistency/addtoqueue.go
@@ -35,8 +35,9 @@ func main() {
 		Message: queue.Message{
 			ID:   *id,
 			From: *from,
+			To:   []string{*rcpt},
 			Rcpt: []*queue.Recipient{
-				{*rcpt, queue.Recipient_EMAIL, queue.Recipient_PENDING},
+				{*rcpt, queue.Recipient_EMAIL, queue.Recipient_PENDING, ""},
 			},
 			Data: data,
 		},