author | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-09-25 14:50:17 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-10-09 23:51:04 UTC |
parent | 927a74aa3cf00d7524a8c1688ec30722d74c697b |
chasquid.go | +1 | -1 |
internal/config/config.pb.go | +5 | -2 |
internal/config/config.proto | +5 | -2 |
internal/queue/dsn.go | +80 | -0 |
internal/queue/dsn_test.go | +27 | -0 |
internal/queue/queue.go | +48 | -15 |
internal/queue/queue.pb.go | +35 | -28 |
internal/queue/queue.proto | +9 | -3 |
internal/queue/queue_test.go | +14 | -13 |
test/t-03-queue_persistency/addtoqueue.go | +2 | -1 |
diff --git a/chasquid.go b/chasquid.go index ff33a0a..adbfae6 100644 --- a/chasquid.go +++ b/chasquid.go @@ -680,7 +680,7 @@ func (c *Conn) DATA(params string, tr *trace.Trace) (code int, msg string) { // There are no partial failures here: we put it in the queue, and then if // individual deliveries fail, we report via email. - msgID, err := c.queue.Put(c.mailFrom, c.rcptTo, c.data) + msgID, err := c.queue.Put(c.hostname, c.mailFrom, c.rcptTo, c.data) if err != nil { tr.LazyPrintf(" error queueing: %v", err) tr.SetError() diff --git a/internal/config/config.pb.go b/internal/config/config.pb.go index 76976d8..e37e6c0 100644 --- a/internal/config/config.pb.go +++ b/internal/config/config.pb.go @@ -29,8 +29,11 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type Config struct { - // Hostname to use when we say hello. - // For aesthetic purposes, but may help if our ip address resolves to it. + // Main/default hostname to use. + // This is used to say hello to clients, and by default as the domain + // we send delivery notifications errors from. + // It should be a domain we can send email from. + // It usually helps if our IP address resolves to it. // Default: machine hostname. Hostname string `protobuf:"bytes,1,opt,name=hostname" json:"hostname,omitempty"` // Maximum email size, in megabytes. diff --git a/internal/config/config.proto b/internal/config/config.proto index b68c159..b57e335 100644 --- a/internal/config/config.proto +++ b/internal/config/config.proto @@ -2,8 +2,11 @@ syntax = "proto3"; message Config { - // Hostname to use when we say hello. - // For aesthetic purposes, but may help if our ip address resolves to it. + // Main/default hostname to use. + // This is used to say hello to clients, and by default as the domain + // we send delivery notifications errors from. + // It should be a domain we can send email from. + // It usually helps if our IP address resolves to it. // Default: machine hostname. string hostname = 1; diff --git a/internal/queue/dsn.go b/internal/queue/dsn.go new file mode 100644 index 0000000..0300b3e --- /dev/null +++ b/internal/queue/dsn.go @@ -0,0 +1,80 @@ +package queue + +import ( + "bytes" + "text/template" + "time" +) + +// Maximum length of the original message to include in the DSN. +const maxOrigMsgLen = 256 * 1024 + +// deliveryStatusNotification creates a delivery status notification (DSN) for +// the given item, and puts it in the queue. +// +// There is a standard, https://tools.ietf.org/html/rfc3464, although most +// MTAs seem to use a plain email and include an X-Failed-Recipients header. +// We're going with the latter for now, may extend it to the former later. +func deliveryStatusNotification(item *Item) ([]byte, error) { + info := dsnInfo{ + OurDomain: item.Hostname, + Destination: item.From, + MessageID: "chasquid-dsn-" + <-newID + "@" + item.Hostname, + Date: time.Now().Format(time.RFC1123Z), + To: item.To, + Recipients: item.Rcpt, + } + + for _, rcpt := range item.Rcpt { + if rcpt.Status == Recipient_FAILED { + info.FailedRcpts = append(info.FailedRcpts, rcpt) + } + } + + if len(item.Data) > maxOrigMsgLen { + info.OriginalMessage = string(item.Data[:maxOrigMsgLen]) + } else { + info.OriginalMessage = string(item.Data) + } + + buf := &bytes.Buffer{} + err := dsnTemplate.Execute(buf, info) + return buf.Bytes(), err +} + +type dsnInfo struct { + OurDomain string + Destination string + MessageID string + Date string + To []string + Recipients []*Recipient + FailedRcpts []*Recipient + OriginalMessage string +} + +var dsnTemplate = template.Must(template.New("dsn").Parse( + `From: Mail Delivery System <postmaster-dsn@{{.OurDomain}}> +To: <{{.Destination}}> +Subject: Mail delivery failed: returning message to sender +Message-ID: <{{.MessageID}}> +Date: {{.Date}} +X-Failed-Recipients: {{range .To}}{{.}}, {{end}} +Auto-Submitted: auto-replied + +Delivery to the following recipient(s) failed permanently: + + {{range .To -}} - {{.}} + {{end}} + +----- Technical details ----- +{{range .Recipients}} +- "{{.Address}}" ({{.Type}}) failed with error: + {{.LastFailureMessage}} +{{end}} + +----- Original message ----- + +{{.OriginalMessage}} + +`)) diff --git a/internal/queue/dsn_test.go b/internal/queue/dsn_test.go new file mode 100644 index 0000000..8c3df7e --- /dev/null +++ b/internal/queue/dsn_test.go @@ -0,0 +1,27 @@ +package queue + +import "testing" + +func TestDSN(t *testing.T) { + item := &Item{ + Message: Message{ + ID: <-newID, + From: "from@from.org", + To: []string{"toto@africa.org", "negra@sosa.org"}, + Rcpt: []*Recipient{ + {"poe@rcpt", Recipient_EMAIL, Recipient_FAILED, + "oh! horror!"}, + {"newman@rcpt", Recipient_EMAIL, Recipient_FAILED, + "oh! the humanity!"}}, + Data: []byte("data ñaca"), + Hostname: "from.org", + }, + } + + msg, err := deliveryStatusNotification(item) + if err != nil { + t.Error(err) + } + + t.Log(string(msg)) +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 3c21212..865722e 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -143,21 +143,24 @@ func (q *Queue) Len() int { } // Put an envelope in the queue. -func (q *Queue) Put(from string, to []string, data []byte) (string, error) { +func (q *Queue) Put(hostname, from string, to []string, data []byte) (string, error) { if q.Len() >= maxQueueSize { return "", errQueueFull } item := &Item{ Message: Message{ - ID: <-newID, - From: from, - Data: data, + ID: <-newID, + From: from, + Data: data, + Hostname: hostname, }, CreatedAt: time.Now(), } for _, t := range to { + item.To = append(item.To, t) + rcpts, err := q.aliases.Resolve(t) if err != nil { return "", fmt.Errorf("error resolving aliases for %q: %v", t, err) @@ -193,7 +196,7 @@ func (q *Queue) Put(from string, to []string, data []byte) (string, error) { q.q[item.ID] = item q.mu.Unlock() - glog.Infof("%s accepted from %q", item.ID, from) + glog.Infof("%s accepted from %q to %v", item.ID, from, to) // Begin to send it right away. go item.SendLoop(q) @@ -258,7 +261,6 @@ func (item *Item) WriteTo(dir string) error { } func (item *Item) SendLoop(q *Queue) { - tr := trace.New("Queue", item.ID) defer tr.Finish() tr.LazyPrintf("from: %s", item.From) @@ -304,6 +306,9 @@ func (item *Item) SendLoop(q *Queue) { if oldStatus != status { item.Lock() rcpt.Status = status + if err != nil { + rcpt.LastFailureMessage = err.Error() + } item.Unlock() err = item.WriteTo(q.path) @@ -316,20 +321,15 @@ func (item *Item) SendLoop(q *Queue) { } wg.Wait() + // If they're all done, no need to wait. pending := 0 for _, rcpt := range item.Rcpt { if rcpt.Status == Recipient_PENDING { pending++ } } - if pending == 0 { - // Completed to all recipients (some may not have succeeded). - tr.LazyPrintf("all done") - glog.Infof("%s all done", item.ID) - - q.Remove(item.ID) - return + break } // TODO: Consider sending a non-final notification after 30m or so, @@ -341,8 +341,41 @@ func (item *Item) SendLoop(q *Queue) { time.Sleep(delay) } - // TODO: Send a notification message for the recipients we failed to send, - // remove item from the queue, and remove from disk. + // Completed to all recipients (some may not have succeeded). + tr.LazyPrintf("all done") + glog.Infof("%s all done", item.ID) + + failed := 0 + for _, rcpt := range item.Rcpt { + if rcpt.Status == Recipient_FAILED { + failed++ + } + } + + if failed > 0 && item.From != "<>" { + sendDSN(tr, q, item) + } + + q.Remove(item.ID) + + return +} + +func sendDSN(tr trace.Trace, q *Queue, item *Item) { + tr.LazyPrintf("sending DSN") + + msg, err := deliveryStatusNotification(item) + if err != nil { + tr.LazyPrintf("failed to build DSN: %v", err) + glog.Infof("%s: failed to build DSN: %v", item.ID, err) + return + } + + _, err = q.Put(item.Hostname, "<>", []string{item.From}, msg) + if err != nil { + tr.LazyPrintf("failed to queue DSN: %v", err) + glog.Infof("%s: failed to queue DSN: %v", item.ID, err) + } } // deliver the item to the given recipient, using the couriers from the queue. diff --git a/internal/queue/queue.pb.go b/internal/queue/queue.pb.go index 0c7e655..9140e56 100644 --- a/internal/queue/queue.pb.go +++ b/internal/queue/queue.pb.go @@ -81,10 +81,13 @@ type Message struct { ID string `protobuf:"bytes,1,opt,name=ID,json=iD" json:"ID,omitempty"` // The envelope for this message. From string `protobuf:"bytes,2,opt,name=from" json:"from,omitempty"` - Rcpt []*Recipient `protobuf:"bytes,3,rep,name=rcpt" json:"rcpt,omitempty"` - Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` + To []string `protobuf:"bytes,3,rep,name=To,json=to" json:"To,omitempty"` + Rcpt []*Recipient `protobuf:"bytes,4,rep,name=rcpt" json:"rcpt,omitempty"` + Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` // Creation timestamp. - CreatedAtTs *google_protobuf.Timestamp `protobuf:"bytes,5,opt,name=created_at_ts,json=createdAtTs" json:"created_at_ts,omitempty"` + CreatedAtTs *google_protobuf.Timestamp `protobuf:"bytes,6,opt,name=created_at_ts,json=createdAtTs" json:"created_at_ts,omitempty"` + // Hostname of the server receiving this message. + Hostname string `protobuf:"bytes,7,opt,name=hostname" json:"hostname,omitempty"` } func (m *Message) Reset() { *m = Message{} } @@ -107,9 +110,10 @@ func (m *Message) GetCreatedAtTs() *google_protobuf.Timestamp { } type Recipient struct { - Address string `protobuf:"bytes,1,opt,name=address" json:"address,omitempty"` - Type Recipient_Type `protobuf:"varint,2,opt,name=type,enum=queue.Recipient_Type" json:"type,omitempty"` - Status Recipient_Status `protobuf:"varint,3,opt,name=status,enum=queue.Recipient_Status" json:"status,omitempty"` + Address string `protobuf:"bytes,1,opt,name=address" json:"address,omitempty"` + Type Recipient_Type `protobuf:"varint,2,opt,name=type,enum=queue.Recipient_Type" json:"type,omitempty"` + Status Recipient_Status `protobuf:"varint,3,opt,name=status,enum=queue.Recipient_Status" json:"status,omitempty"` + LastFailureMessage string `protobuf:"bytes,4,opt,name=last_failure_message,json=lastFailureMessage" json:"last_failure_message,omitempty"` } func (m *Recipient) Reset() { *m = Recipient{} } @@ -127,26 +131,29 @@ func init() { func init() { proto.RegisterFile("queue.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 332 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x8f, 0x51, 0x4b, 0xc3, 0x30, - 0x14, 0x85, 0x6d, 0xd7, 0x75, 0xee, 0x56, 0x47, 0x09, 0x88, 0x65, 0xbe, 0x8c, 0xe2, 0xc3, 0x44, - 0x68, 0x61, 0x3e, 0x0a, 0xc2, 0xa0, 0x55, 0x0a, 0x6e, 0x8c, 0xac, 0xef, 0x23, 0x6d, 0xb3, 0x5a, - 0x58, 0x97, 0xda, 0xa4, 0x0f, 0xfe, 0x22, 0x7f, 0x8c, 0x7f, 0xca, 0x34, 0xed, 0x14, 0xf4, 0xed, - 0xde, 0x9c, 0x73, 0x72, 0xbf, 0x03, 0xd6, 0x7b, 0x43, 0x1b, 0xea, 0x55, 0x35, 0x13, 0x0c, 0x0d, - 0xd5, 0x32, 0x7d, 0xcc, 0x0b, 0xf1, 0xd6, 0x24, 0x5e, 0xca, 0x4a, 0x3f, 0x67, 0x07, 0x72, 0xcc, - 0x7d, 0xa5, 0x27, 0xcd, 0xde, 0xaf, 0xc4, 0x47, 0x45, 0xb9, 0x2f, 0x8a, 0x92, 0x72, 0x41, 0xca, - 0xea, 0x77, 0xea, 0xfe, 0x70, 0x3f, 0x35, 0x18, 0xad, 0x28, 0xe7, 0x24, 0xa7, 0x68, 0x02, 0x7a, - 0x14, 0x38, 0xda, 0x4c, 0x9b, 0x8f, 0xb1, 0x5e, 0x04, 0x08, 0x81, 0xb1, 0xaf, 0x59, 0xe9, 0xe8, - 0xea, 0x45, 0xcd, 0xe8, 0x16, 0x8c, 0x3a, 0xad, 0x84, 0x33, 0x98, 0x0d, 0xe6, 0xd6, 0xc2, 0xf6, - 0x3a, 0x1e, 0x4c, 0xd3, 0xa2, 0x2a, 0xe8, 0x51, 0x60, 0xa5, 0xb6, 0xc9, 0x8c, 0x08, 0xe2, 0x18, - 0x32, 0x79, 0x81, 0xd5, 0x8c, 0x9e, 0xe0, 0x32, 0xad, 0x29, 0x11, 0x34, 0xdb, 0x11, 0xb1, 0x13, - 0xdc, 0x19, 0x4a, 0xd1, 0x5a, 0x4c, 0xbd, 0x9c, 0xb1, 0xfc, 0xd0, 0x77, 0x92, 0xcc, 0x5e, 0x7c, - 0x42, 0xc4, 0x56, 0x1f, 0x58, 0x8a, 0x98, 0xbb, 0x5f, 0x1a, 0x8c, 0x7f, 0xee, 0x20, 0x07, 0x46, - 0x24, 0xcb, 0x6a, 0x49, 0xde, 0x03, 0x9f, 0x56, 0x74, 0x07, 0x46, 0x5b, 0x5a, 0x51, 0x4f, 0x16, - 0x57, 0x7f, 0x09, 0xbd, 0x58, 0x8a, 0x58, 0x59, 0x90, 0x0f, 0xa6, 0x3c, 0x24, 0x1a, 0x2e, 0xeb, - 0xb4, 0xe6, 0xeb, 0x7f, 0xe6, 0xad, 0x92, 0x71, 0x6f, 0x73, 0x6f, 0xc0, 0x68, 0xe3, 0x68, 0x0c, - 0xc3, 0x70, 0xb5, 0x8c, 0x5e, 0xed, 0x33, 0x74, 0x0e, 0xc6, 0x26, 0xda, 0x84, 0xb6, 0xe6, 0xde, - 0x83, 0xd9, 0xd9, 0x91, 0x05, 0xa3, 0x4d, 0xb8, 0x0e, 0xa2, 0xf5, 0x4b, 0x67, 0xd8, 0x86, 0xeb, - 0xd8, 0xd6, 0x10, 0x80, 0xf9, 0x2c, 0x43, 0x61, 0x60, 0xeb, 0x89, 0xa9, 0xea, 0x3e, 0x7c, 0x07, - 0x00, 0x00, 0xff, 0xff, 0xd1, 0x22, 0xa1, 0x0c, 0xd1, 0x01, 0x00, 0x00, + // 382 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x50, 0x41, 0xcb, 0x9b, 0x40, + 0x10, 0xad, 0x7e, 0x46, 0x93, 0xb1, 0x0d, 0xb2, 0xb4, 0x74, 0x49, 0x2f, 0x41, 0x7a, 0x68, 0x29, + 0x68, 0x49, 0x8f, 0x85, 0x42, 0x40, 0x53, 0x84, 0x26, 0x04, 0xe3, 0x5d, 0x36, 0xba, 0x31, 0x82, + 0x66, 0xad, 0xbb, 0x1e, 0xfa, 0x3b, 0xfb, 0x7b, 0x0a, 0xdd, 0x5d, 0x4d, 0x0a, 0xfd, 0x6e, 0x33, + 0xf3, 0xde, 0xcc, 0xbc, 0xf7, 0xc0, 0xfd, 0x39, 0xd0, 0x81, 0x06, 0x5d, 0xcf, 0x04, 0x43, 0x33, + 0xdd, 0xac, 0xbe, 0x56, 0xb5, 0xb8, 0x0e, 0xe7, 0xa0, 0x60, 0x6d, 0x58, 0xb1, 0x86, 0xdc, 0xaa, + 0x50, 0xe3, 0xe7, 0xe1, 0x12, 0x76, 0xe2, 0x57, 0x47, 0x79, 0x28, 0xea, 0x96, 0x72, 0x41, 0xda, + 0xee, 0x5f, 0x35, 0xde, 0xf0, 0x7f, 0x1b, 0xe0, 0xec, 0x29, 0xe7, 0xa4, 0xa2, 0x68, 0x09, 0x66, + 0x12, 0x61, 0x63, 0x6d, 0x7c, 0x58, 0xa4, 0x66, 0x1d, 0x21, 0x04, 0xd6, 0xa5, 0x67, 0x2d, 0x36, + 0xf5, 0x44, 0xd7, 0x8a, 0x93, 0x31, 0xfc, 0xb4, 0x7e, 0x52, 0x1c, 0xa9, 0xe1, 0x3d, 0x58, 0x7d, + 0xd1, 0x09, 0x6c, 0xc9, 0x89, 0xbb, 0xf1, 0x82, 0x51, 0x5f, 0x4a, 0x8b, 0xba, 0xab, 0xe9, 0x4d, + 0xa4, 0x1a, 0x55, 0x97, 0x4a, 0x22, 0x08, 0x9e, 0xc9, 0x4b, 0x2f, 0x53, 0x5d, 0xa3, 0x6f, 0xf0, + 0xaa, 0xe8, 0x29, 0x11, 0xb4, 0xcc, 0x89, 0xc8, 0x05, 0xc7, 0xb6, 0x04, 0xdd, 0xcd, 0x2a, 0xa8, + 0x18, 0xab, 0x9a, 0xc9, 0xa3, 0xf4, 0x10, 0x64, 0x77, 0xc9, 0xa9, 0x3b, 0x2d, 0x6c, 0x45, 0xc6, + 0xd1, 0x0a, 0xe6, 0x57, 0xc6, 0xc5, 0x8d, 0xb4, 0x14, 0x3b, 0x5a, 0xe1, 0xa3, 0xf7, 0xff, 0x18, + 0xb0, 0x78, 0x68, 0x40, 0x18, 0x1c, 0x52, 0x96, 0xbd, 0x74, 0x39, 0x99, 0xbb, 0xb7, 0xe8, 0x23, + 0x58, 0x2a, 0x20, 0xed, 0x70, 0xb9, 0x79, 0xf3, 0xbf, 0xfa, 0x20, 0x93, 0x60, 0xaa, 0x29, 0x28, + 0x04, 0x5b, 0x8a, 0x10, 0x03, 0x97, 0xe6, 0x15, 0xf9, 0xed, 0x33, 0xf2, 0x49, 0xc3, 0xe9, 0x44, + 0x43, 0x9f, 0xe1, 0x75, 0x43, 0xb8, 0xc8, 0x2f, 0xa4, 0x6e, 0x86, 0x9e, 0xe6, 0xed, 0x98, 0xb2, + 0x4c, 0x4a, 0x49, 0x40, 0x0a, 0xdb, 0x8d, 0xd0, 0x94, 0xbf, 0xff, 0x0e, 0x2c, 0xf5, 0x10, 0x2d, + 0x60, 0x16, 0xef, 0xb7, 0xc9, 0x0f, 0xef, 0x05, 0x9a, 0x83, 0x75, 0x4c, 0x8e, 0xb1, 0x67, 0xf8, + 0x9f, 0xc0, 0x1e, 0x1f, 0x20, 0x17, 0x9c, 0x63, 0x7c, 0x88, 0x92, 0xc3, 0xf7, 0x91, 0x70, 0x8a, + 0x0f, 0x99, 0x67, 0x20, 0x00, 0x7b, 0x27, 0x97, 0xe2, 0xc8, 0x33, 0xcf, 0xb6, 0x0e, 0xef, 0xcb, + 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x32, 0x94, 0x20, 0x2f, 0x02, 0x00, 0x00, } diff --git a/internal/queue/queue.proto b/internal/queue/queue.proto index 4be4a32..f821e62 100644 --- a/internal/queue/queue.proto +++ b/internal/queue/queue.proto @@ -13,11 +13,15 @@ message Message { // The envelope for this message. string from = 2; - repeated Recipient rcpt = 3; - bytes data = 4; + repeated string To = 3; + repeated Recipient rcpt = 4; + bytes data = 5; // Creation timestamp. - google.protobuf.Timestamp created_at_ts = 5; + google.protobuf.Timestamp created_at_ts = 6; + + // Hostname of the server receiving this message. + string hostname = 7; } message Recipient { @@ -35,5 +39,7 @@ message Recipient { FAILED = 2; } Status status = 3; + + string last_failure_message = 4; } diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index a750849..5e8262e 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -65,7 +65,7 @@ func TestBasic(t *testing.T) { localC.wg.Add(2) remoteC.wg.Add(1) - id, err := q.Put("from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data")) + id, err := q.Put("host", "from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data")) if err != nil { t.Fatalf("Put: %v", err) } @@ -110,7 +110,8 @@ func TestFullQueue(t *testing.T) { Message: Message{ ID: <-newID, From: fmt.Sprintf("from-%d", i), - Rcpt: []*Recipient{{"to", Recipient_EMAIL, Recipient_PENDING}}, + Rcpt: []*Recipient{ + {"to", Recipient_EMAIL, Recipient_PENDING, ""}}, Data: []byte("data"), }, CreatedAt: time.Now(), @@ -120,7 +121,7 @@ func TestFullQueue(t *testing.T) { } // This one should fail due to the queue being too big. - id, err := q.Put("from", []string{"to"}, []byte("data-qf")) + id, err := q.Put("host", "from", []string{"to"}, []byte("data-qf")) if err != errQueueFull { t.Errorf("Not failed as expected: %v - %v", id, err) } @@ -131,7 +132,7 @@ func TestFullQueue(t *testing.T) { q.q[oneID].WriteTo(q.path) q.Remove(oneID) - id, err = q.Put("from", []string{"to"}, []byte("data")) + id, err = q.Put("host", "from", []string{"to"}, []byte("data")) if err != nil { t.Errorf("Put: %v", err) } @@ -162,17 +163,17 @@ func TestAliases(t *testing.T) { expected []*Recipient }{ {[]string{"ab@loco"}, []*Recipient{ - {"pq@loco", Recipient_EMAIL, Recipient_PENDING}, - {"rs@loco", Recipient_EMAIL, Recipient_PENDING}, - {"command", Recipient_PIPE, Recipient_PENDING}}}, + {"pq@loco", Recipient_EMAIL, Recipient_PENDING, ""}, + {"rs@loco", Recipient_EMAIL, Recipient_PENDING, ""}, + {"command", Recipient_PIPE, Recipient_PENDING, ""}}}, {[]string{"ab@loco", "cd@loco"}, []*Recipient{ - {"pq@loco", Recipient_EMAIL, Recipient_PENDING}, - {"rs@loco", Recipient_EMAIL, Recipient_PENDING}, - {"command", Recipient_PIPE, Recipient_PENDING}, - {"ata@hualpa", Recipient_EMAIL, Recipient_PENDING}}}, + {"pq@loco", Recipient_EMAIL, Recipient_PENDING, ""}, + {"rs@loco", Recipient_EMAIL, Recipient_PENDING, ""}, + {"command", Recipient_PIPE, Recipient_PENDING, ""}, + {"ata@hualpa", Recipient_EMAIL, Recipient_PENDING, ""}}}, } for _, c := range cases { - id, err := q.Put("from", c.to, []byte("data")) + id, err := q.Put("host", "from", c.to, []byte("data")) if err != nil { t.Errorf("Put: %v", err) } @@ -192,7 +193,7 @@ func TestPipes(t *testing.T) { ID: <-newID, From: "from", Rcpt: []*Recipient{ - {"true", Recipient_PIPE, Recipient_PENDING}}, + {"true", Recipient_PIPE, Recipient_PENDING, ""}}, Data: []byte("data"), }, CreatedAt: time.Now(), diff --git a/test/t-03-queue_persistency/addtoqueue.go b/test/t-03-queue_persistency/addtoqueue.go index bfe7af8..03b2aab 100644 --- a/test/t-03-queue_persistency/addtoqueue.go +++ b/test/t-03-queue_persistency/addtoqueue.go @@ -35,8 +35,9 @@ func main() { Message: queue.Message{ ID: *id, From: *from, + To: []string{*rcpt}, Rcpt: []*queue.Recipient{ - {*rcpt, queue.Recipient_EMAIL, queue.Recipient_PENDING}, + {*rcpt, queue.Recipient_EMAIL, queue.Recipient_PENDING, ""}, }, Data: data, },