author | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-09-18 05:13:42 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2016-10-09 23:51:04 UTC |
parent | 9ed30a747b8ae0cb3c229933b4ba90d461e8abb9 |
chasquid.go | +11 | -5 |
chasquid_test.go | +1 | -0 |
internal/queue/queue.go | +147 | -45 |
internal/queue/queue.pb.go | +149 | -0 |
internal/queue/queue.proto | +38 | -0 |
internal/queue/queue_test.go | +19 | -12 |
test/t-03-queue_persistency/addtoqueue.go | +52 | -0 |
test/t-03-queue_persistency/config/chasquid.conf | +8 | -0 |
test/t-03-queue_persistency/content | +4 | -0 |
test/t-03-queue_persistency/hosts | +1 | -0 |
test/t-03-queue_persistency/run.sh | +25 | -0 |
diff --git a/chasquid.go b/chasquid.go index f58337f..76decbf 100644 --- a/chasquid.go +++ b/chasquid.go @@ -103,6 +103,8 @@ func main() { // as a remote domain (for loops, alias resolutions, etc.). s.AddDomain("localhost") + s.LoadQueue(conf.DataDir + "/queue") + // Load the addresses and listeners. systemdLs, err := systemd.Listeners() if err != nil { @@ -249,6 +251,15 @@ func (s *Server) AddUserDB(domain string, db *userdb.DB) { s.userDBs[domain] = db } +func (s *Server) LoadQueue(path string) { + q := queue.New(path, s.localDomains) + err := q.Load() + if err != nil { + glog.Fatalf("Error loading queue: %v", err) + } + s.queue = q +} + func (s *Server) getTLSConfig() (*tls.Config, error) { var err error conf := &tls.Config{} @@ -275,11 +286,6 @@ func (s *Server) ListenAndServe() { glog.Fatalf("Error loading TLS config: %v", err) } - // TODO: Create the queue when creating the server? - // Or even before, and just give it to the server? - s.queue = queue.New( - &courier.Procmail{}, &courier.SMTP{}, s.localDomains) - for m, addrs := range s.addrs { for _, addr := range addrs { // Listen. diff --git a/chasquid_test.go b/chasquid_test.go index 243d83d..eda11bf 100644 --- a/chasquid_test.go +++ b/chasquid_test.go @@ -428,6 +428,7 @@ func realMain(m *testing.M) int { s.AddCerts(tmpDir+"/cert.pem", tmpDir+"/key.pem") s.AddAddr(smtpAddr, ModeSMTP) s.AddAddr(submissionAddr, ModeSubmission) + s.LoadQueue(tmpDir + "/queue") udb := userdb.New("/dev/null") udb.AddUser("testuser", "testpasswd") diff --git a/internal/queue/queue.go b/internal/queue/queue.go index f5450cd..9553533 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -2,18 +2,26 @@ // Accepted envelopes get put in the queue, and processed asynchronously. package queue +// Command to generate queue.pb.go from queue.proto. +//go:generate protoc --go_out=. -I=${GOPATH}/src -I. queue.proto + import ( "crypto/rand" "encoding/base64" "fmt" + "os" + "path/filepath" "sync" "time" "blitiri.com.ar/go/chasquid/internal/courier" "blitiri.com.ar/go/chasquid/internal/envelope" + "blitiri.com.ar/go/chasquid/internal/protoio" "blitiri.com.ar/go/chasquid/internal/set" "github.com/golang/glog" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/timestamp" "golang.org/x/net/trace" ) @@ -23,6 +31,13 @@ const ( // Give up sending attempts after this duration. giveUpAfter = 12 * time.Hour + + // Prefix for item file names. + // This is for convenience, versioning, and to be able to tell them apart + // temporary files and other cruft. + // It's important that it's outside the base64 space so it doesn't get + // generated accidentally. + itemFilePrefix = "m:" ) var ( @@ -68,17 +83,45 @@ type Queue struct { // Domains we consider local. localDomains *set.String + + // Path where we store the queue. + path string } -// TODO: Store the queue on disk. // Load the queue and launch the sending loops on startup. -func New(localC, remoteC courier.Courier, localDomains *set.String) *Queue { +func New(path string, localDomains *set.String) *Queue { + os.MkdirAll(path, 0700) + return &Queue{ q: map[string]*Item{}, - localC: localC, - remoteC: remoteC, + localC: &courier.Procmail{}, + remoteC: &courier.SMTP{}, localDomains: localDomains, + path: path, + } +} + +func (q *Queue) Load() error { + files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*") + if err != nil { + return err + } + + for _, fname := range files { + item, err := ItemFromFile(fname) + if err != nil { + glog.Errorf("error loading queue item from %q: %v", fname, err) + continue + } + + q.mu.Lock() + q.q[item.ID] = item + q.mu.Unlock() + + go item.SendLoop(q) } + + return nil } func (q *Queue) Len() int { @@ -94,13 +137,27 @@ func (q *Queue) Put(from string, to []string, data []byte) (string, error) { } item := &Item{ - ID: <-newID, - From: from, - To: to, - Data: data, - Created: time.Now(), - Results: map[string]error{}, + Message: Message{ + ID: <-newID, + From: from, + Data: data, + }, + CreatedAt: time.Now(), + } + + for _, t := range to { + item.Rcpt = append(item.Rcpt, &Recipient{ + Address: t, + Type: Recipient_EMAIL, + Status: Recipient_PENDING, + }) + } + + err := item.WriteTo(q.path) + if err != nil { + return "", fmt.Errorf("failed to write item: %v", err) } + q.mu.Lock() q.q[item.ID] = item q.mu.Unlock() @@ -115,6 +172,12 @@ func (q *Queue) Put(from string, to []string, data []byte) (string, error) { // Remove an item from the queue. func (q *Queue) Remove(id string) { + path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id) + err := os.Remove(path) + if err != nil { + glog.Errorf("failed to remove queue file %q: %v", path, err) + } + q.mu.Lock() delete(q.q, id) q.mu.Unlock() @@ -124,54 +187,70 @@ func (q *Queue) Remove(id string) { // Register it in main(). // An item in the queue. -// This must be easily serializable, so no pointers. type Item struct { - // Item ID. Uniquely identifies this item. - ID string + // Base the item on the protobuf message. + // We will use this for serialization, so any fields below are NOT + // serialized. + Message - // The envelope for this item. - From string - To []string - Data []byte + // Protect the entire item. + sync.Mutex - // Creation time. - Created time.Time + // Go-friendly version of Message.CreatedAtTs. + CreatedAt time.Time +} - // Next attempt to send. - NextAttempt time.Time +func ItemFromFile(fname string) (*Item, error) { + item := &Item{} + err := protoio.ReadTextMessage(fname, &item.Message) + if err != nil { + return nil, err + } - // Map of recipient -> last result of sending it. - Results map[string]error - mu sync.Mutex + item.CreatedAt, err = ptypes.Timestamp(item.CreatedAtTs) + return item, err } -func (item *Item) resultsFor(to string) (error, bool) { - item.mu.Lock() - defer item.mu.Unlock() - value, ok := item.Results[to] - return value, ok +func (item *Item) WriteTo(dir string) error { + item.Lock() + defer item.Unlock() + + var err error + item.CreatedAtTs, err = ptypes.TimestampProto(item.CreatedAt) + if err != nil { + return err + } + + path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID) + + return protoio.WriteTextMessage(path, &item.Message, 0600) } func (item *Item) SendLoop(q *Queue) { - defer q.Remove(item.ID) tr := trace.New("Queue", item.ID) defer tr.Finish() tr.LazyPrintf("from: %s", item.From) var delay time.Duration - for time.Since(item.Created) < giveUpAfter { + for time.Since(item.CreatedAt) < giveUpAfter { // Send to all recipients that are still pending. var wg sync.WaitGroup - for _, to := range item.To { - if err, ok := item.resultsFor(to); ok && err == nil { - // Successful send for this recipient, nothing to do. + for _, rcpt := range item.Rcpt { + item.Lock() + status := rcpt.Status + item.Unlock() + + if status != Recipient_PENDING { continue } wg.Add(1) - go func(to string) { + go func(rcpt *Recipient, oldStatus Recipient_Status) { defer wg.Done() + // TODO: Different types of recipients. + to := rcpt.Address + tr.LazyPrintf("%s sending", to) var err error @@ -183,9 +262,6 @@ func (item *Item) SendLoop(q *Queue) { } else { err = q.remoteC.Deliver(item.From, to, item.Data) } - item.mu.Lock() - item.Results[to] = err - item.mu.Unlock() if err != nil { // TODO: Local deliveries should not be retried, if they @@ -197,21 +273,39 @@ func (item *Item) SendLoop(q *Queue) { } else { tr.LazyPrintf("%s successful", to) glog.Infof("%s -> %q sent", item.ID, to) + + status = Recipient_SENT } - }(to) + + // Update + write on status change. + if oldStatus != status { + item.Lock() + rcpt.Status = status + item.Unlock() + + err = item.WriteTo(q.path) + if err != nil { + tr.LazyPrintf("failed to write: %v", err) + glog.Errorf("%s failed to write: %v", item.ID, err) + } + } + }(rcpt, status) } wg.Wait() - successful := 0 - for _, to := range item.To { - if err, ok := item.resultsFor(to); ok && err == nil { - successful++ + pending := 0 + for _, rcpt := range item.Rcpt { + if rcpt.Status == Recipient_PENDING { + pending++ } } - if successful == len(item.To) { + if pending == 0 { // Successfully sent to all recipients. + tr.LazyPrintf("all successful") glog.Infof("%s all successful", item.ID) + + q.Remove(item.ID) return } @@ -219,11 +313,13 @@ func (item *Item) SendLoop(q *Queue) { // that some of the messages have been delayed. delay = nextDelay(delay) + tr.LazyPrintf("waiting for %v", delay) glog.Infof("%s waiting for %v", item.ID, delay) time.Sleep(delay) } - // TODO: Send a notification message for the recipients we failed to send. + // TODO: Send a notification message for the recipients we failed to send, + // remove item from the queue, and remove from disk. } func nextDelay(last time.Duration) time.Duration { @@ -238,3 +334,9 @@ func nextDelay(last time.Duration) time.Duration { return 20 * time.Minute } } + +func timestampNow() *timestamp.Timestamp { + now := time.Now() + ts, _ := ptypes.TimestampProto(now) + return ts +} diff --git a/internal/queue/queue.pb.go b/internal/queue/queue.pb.go new file mode 100644 index 0000000..98fc7a1 --- /dev/null +++ b/internal/queue/queue.pb.go @@ -0,0 +1,149 @@ +// Code generated by protoc-gen-go. +// source: queue.proto +// DO NOT EDIT! + +/* +Package queue is a generated protocol buffer package. + +It is generated from these files: + queue.proto + +It has these top-level messages: + Message + Recipient +*/ +package queue + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import google_protobuf "github.com/golang/protobuf/ptypes/timestamp" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Recipient_Type int32 + +const ( + Recipient_EMAIL Recipient_Type = 0 +) + +var Recipient_Type_name = map[int32]string{ + 0: "EMAIL", +} +var Recipient_Type_value = map[string]int32{ + "EMAIL": 0, +} + +func (x Recipient_Type) String() string { + return proto.EnumName(Recipient_Type_name, int32(x)) +} +func (Recipient_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 0} } + +type Recipient_Status int32 + +const ( + Recipient_PENDING Recipient_Status = 0 + Recipient_SENT Recipient_Status = 1 + Recipient_FAILED Recipient_Status = 2 +) + +var Recipient_Status_name = map[int32]string{ + 0: "PENDING", + 1: "SENT", + 2: "FAILED", +} +var Recipient_Status_value = map[string]int32{ + "PENDING": 0, + "SENT": 1, + "FAILED": 2, +} + +func (x Recipient_Status) String() string { + return proto.EnumName(Recipient_Status_name, int32(x)) +} +func (Recipient_Status) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 1} } + +type Message struct { + // Message ID. Uniquely identifies this message, it is used for + // auditing and troubleshooting. + ID string `protobuf:"bytes,1,opt,name=ID,json=iD" json:"ID,omitempty"` + // The envelope for this message. + From string `protobuf:"bytes,2,opt,name=from" json:"from,omitempty"` + Rcpt []*Recipient `protobuf:"bytes,3,rep,name=rcpt" json:"rcpt,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` + // Creation timestamp. + CreatedAtTs *google_protobuf.Timestamp `protobuf:"bytes,5,opt,name=created_at_ts,json=createdAtTs" json:"created_at_ts,omitempty"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Message) GetRcpt() []*Recipient { + if m != nil { + return m.Rcpt + } + return nil +} + +func (m *Message) GetCreatedAtTs() *google_protobuf.Timestamp { + if m != nil { + return m.CreatedAtTs + } + return nil +} + +type Recipient struct { + Address string `protobuf:"bytes,1,opt,name=address" json:"address,omitempty"` + Type Recipient_Type `protobuf:"varint,2,opt,name=type,enum=queue.Recipient_Type" json:"type,omitempty"` + Status Recipient_Status `protobuf:"varint,3,opt,name=status,enum=queue.Recipient_Status" json:"status,omitempty"` +} + +func (m *Recipient) Reset() { *m = Recipient{} } +func (m *Recipient) String() string { return proto.CompactTextString(m) } +func (*Recipient) ProtoMessage() {} +func (*Recipient) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func init() { + proto.RegisterType((*Message)(nil), "queue.Message") + proto.RegisterType((*Recipient)(nil), "queue.Recipient") + proto.RegisterEnum("queue.Recipient_Type", Recipient_Type_name, Recipient_Type_value) + proto.RegisterEnum("queue.Recipient_Status", Recipient_Status_name, Recipient_Status_value) +} + +func init() { proto.RegisterFile("queue.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 329 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x8f, 0x41, 0x6b, 0xab, 0x40, + 0x14, 0x85, 0xa3, 0x31, 0xe6, 0xe5, 0xfa, 0x5e, 0xf0, 0x0d, 0x94, 0x4a, 0x56, 0x41, 0xba, 0x48, + 0x29, 0x28, 0xa4, 0xcb, 0x42, 0x21, 0xa0, 0x2d, 0x81, 0x26, 0x94, 0x89, 0xfb, 0x30, 0xd1, 0x89, + 0x15, 0x62, 0xc6, 0x3a, 0xd7, 0x45, 0x7f, 0x51, 0x7f, 0x47, 0xff, 0x59, 0xc7, 0x89, 0x69, 0xa1, + 0xdd, 0xdd, 0x3b, 0xe7, 0x9c, 0xb9, 0xdf, 0x01, 0xe7, 0xb5, 0xe1, 0x0d, 0x0f, 0xaa, 0x5a, 0xa0, + 0x20, 0x03, 0xbd, 0x4c, 0xee, 0xf2, 0x02, 0x5f, 0x9a, 0x5d, 0x90, 0x8a, 0x32, 0xcc, 0xc5, 0x81, + 0x1d, 0xf3, 0x50, 0xeb, 0xbb, 0x66, 0x1f, 0x56, 0xf8, 0x56, 0x71, 0x19, 0x62, 0x51, 0x72, 0x89, + 0xac, 0xac, 0xbe, 0xa7, 0xd3, 0x1f, 0xfe, 0xbb, 0x01, 0xc3, 0x15, 0x97, 0x92, 0xe5, 0x9c, 0x8c, + 0xc1, 0x5c, 0x46, 0x9e, 0x31, 0x35, 0x66, 0x23, 0x6a, 0x16, 0x11, 0x21, 0x60, 0xed, 0x6b, 0x51, + 0x7a, 0xa6, 0x7e, 0xd1, 0x33, 0xb9, 0x02, 0xab, 0x4e, 0x2b, 0xf4, 0xfa, 0xd3, 0xfe, 0xcc, 0x99, + 0xbb, 0xc1, 0x89, 0x87, 0xf2, 0xb4, 0xa8, 0x0a, 0x7e, 0x44, 0xaa, 0xd5, 0x36, 0x99, 0x31, 0x64, + 0x9e, 0xa5, 0x92, 0x7f, 0xa9, 0x9e, 0xc9, 0x3d, 0xfc, 0x4b, 0x6b, 0xce, 0x90, 0x67, 0x5b, 0x86, + 0x5b, 0x94, 0xde, 0x40, 0x89, 0xce, 0x7c, 0x12, 0xe4, 0x42, 0xe4, 0x87, 0xae, 0x93, 0x62, 0x0e, + 0x92, 0x33, 0x22, 0x75, 0xba, 0xc0, 0x02, 0x13, 0xe9, 0x7f, 0x18, 0x30, 0xfa, 0xba, 0x43, 0x3c, + 0x18, 0xb2, 0x2c, 0xab, 0x15, 0x79, 0x07, 0x7c, 0x5e, 0xc9, 0x35, 0x58, 0x6d, 0x69, 0x4d, 0x3d, + 0x9e, 0x5f, 0xfc, 0x24, 0x0c, 0x12, 0x25, 0x52, 0x6d, 0x21, 0x21, 0xd8, 0xea, 0x10, 0x36, 0x52, + 0xd5, 0x69, 0xcd, 0x97, 0xbf, 0xcc, 0x1b, 0x2d, 0xd3, 0xce, 0xe6, 0xff, 0x07, 0xab, 0x8d, 0x93, + 0x11, 0x0c, 0xe2, 0xd5, 0x62, 0xf9, 0xe4, 0xf6, 0xfc, 0x1b, 0xb0, 0x4f, 0x26, 0xe2, 0xc0, 0xf0, + 0x39, 0x5e, 0x47, 0xcb, 0xf5, 0xa3, 0xdb, 0x23, 0x7f, 0xc0, 0xda, 0xc4, 0xeb, 0xc4, 0x35, 0x08, + 0x80, 0xfd, 0xa0, 0xac, 0x71, 0xe4, 0x9a, 0x3b, 0x5b, 0x97, 0xbc, 0xfd, 0x0c, 0x00, 0x00, 0xff, + 0xff, 0x64, 0x3a, 0x92, 0xc1, 0xc7, 0x01, 0x00, 0x00, +} diff --git a/internal/queue/queue.proto b/internal/queue/queue.proto new file mode 100644 index 0000000..d01b8e7 --- /dev/null +++ b/internal/queue/queue.proto @@ -0,0 +1,38 @@ + +syntax = "proto3"; + +package queue; + +import "github.com/golang/protobuf/ptypes/timestamp/timestamp.proto"; + + +message Message { + // Message ID. Uniquely identifies this message, it is used for + // auditing and troubleshooting. + string ID = 1; + + // The envelope for this message. + string from = 2; + repeated Recipient rcpt = 3; + bytes data = 4; + + // Creation timestamp. + google.protobuf.Timestamp created_at_ts = 5; +} + +message Recipient { + string address = 1; + + enum Type { + EMAIL = 0; + } + Type type = 2; + + enum Status { + PENDING = 0; + SENT = 1; + FAILED = 2; + } + Status status = 3; +} + diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index a1ac39d..7f4571f 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -2,6 +2,7 @@ package queue import ( "bytes" + "fmt" "sync" "testing" "time" @@ -57,7 +58,9 @@ func newTestCourier() *TestCourier { func TestBasic(t *testing.T) { localC := newTestCourier() remoteC := newTestCourier() - q := New(localC, remoteC, set.NewString("loco")) + q := New("/tmp/queue_test", set.NewString("loco")) + q.localC = localC + q.remoteC = remoteC localC.wg.Add(2) remoteC.wg.Add(1) @@ -96,35 +99,39 @@ func TestBasic(t *testing.T) { } func TestFullQueue(t *testing.T) { - localC := newChanCourier() - remoteC := newChanCourier() - q := New(localC, remoteC, set.NewString()) + q := New("/tmp/queue_test", set.NewString()) // Force-insert maxQueueSize items in the queue. oneID := "" for i := 0; i < maxQueueSize; i++ { item := &Item{ - ID: <-newID, - From: "from", - To: []string{"to"}, - Data: []byte("data"), - Created: time.Now(), - Results: map[string]error{}, + Message: Message{ + ID: <-newID, + From: fmt.Sprintf("from-%d", i), + Rcpt: []*Recipient{{"to", Recipient_EMAIL, Recipient_PENDING}}, + Data: []byte("data"), + }, + CreatedAt: time.Now(), } q.q[item.ID] = item oneID = item.ID } // This one should fail due to the queue being too big. - id, err := q.Put("from", []string{"to"}, []byte("data")) + id, err := q.Put("from", []string{"to"}, []byte("data-qf")) if err != errQueueFull { t.Errorf("Not failed as expected: %v - %v", id, err) } // Remove one, and try again: it should succeed. + // Write it first so we don't get complaints about the file not existing + // (as we did not all the items properly). + q.q[oneID].WriteTo(q.path) q.Remove(oneID) - _, err = q.Put("from", []string{"to"}, []byte("data")) + + id, err = q.Put("from", []string{"to"}, []byte("data")) if err != nil { t.Errorf("Put: %v", err) } + q.Remove(id) } diff --git a/test/t-03-queue_persistency/addtoqueue.go b/test/t-03-queue_persistency/addtoqueue.go new file mode 100644 index 0000000..bfe7af8 --- /dev/null +++ b/test/t-03-queue_persistency/addtoqueue.go @@ -0,0 +1,52 @@ +// addtoqueue is a test helper which adds a queue item directly to the queue +// directory, behind chasquid's back. +// +// Note that chasquid does NOT support this, we do it before starting up the +// daemon for testing purposes only. +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "os" + "time" + + "blitiri.com.ar/go/chasquid/internal/queue" +) + +var ( + queueDir = flag.String("queue_dir", ".queue", "queue directory") + id = flag.String("id", "mid1234", "Message ID") + from = flag.String("from", "from", "Mail from") + rcpt = flag.String("rcpt", "rcpt", "Rcpt to") +) + +func main() { + flag.Parse() + + data, err := ioutil.ReadAll(os.Stdin) + if err != nil { + fmt.Printf("error reading data: %v\n", err) + os.Exit(1) + } + + item := &queue.Item{ + Message: queue.Message{ + ID: *id, + From: *from, + Rcpt: []*queue.Recipient{ + {*rcpt, queue.Recipient_EMAIL, queue.Recipient_PENDING}, + }, + Data: data, + }, + CreatedAt: time.Now(), + } + + os.MkdirAll(*queueDir, 0700) + err = item.WriteTo(*queueDir) + if err != nil { + fmt.Printf("error writing item: %v\n", err) + os.Exit(1) + } +} diff --git a/test/t-03-queue_persistency/config/chasquid.conf b/test/t-03-queue_persistency/config/chasquid.conf new file mode 100644 index 0000000..623bab4 --- /dev/null +++ b/test/t-03-queue_persistency/config/chasquid.conf @@ -0,0 +1,8 @@ +smtp_address: ":1025" +submission_address: ":1587" +monitoring_address: ":1099" + +mail_delivery_agent_bin: "test-mda" +mail_delivery_agent_args: "%user%@%domain%" + +data_dir: "../.data" diff --git a/test/t-03-queue_persistency/content b/test/t-03-queue_persistency/content new file mode 100644 index 0000000..76a8b16 --- /dev/null +++ b/test/t-03-queue_persistency/content @@ -0,0 +1,4 @@ +Subject: Prueba desde el test + +Crece desde el test el futuro +Crece desde el test diff --git a/test/t-03-queue_persistency/hosts b/test/t-03-queue_persistency/hosts new file mode 100644 index 0000000..2b9b623 --- /dev/null +++ b/test/t-03-queue_persistency/hosts @@ -0,0 +1 @@ +testserver localhost diff --git a/test/t-03-queue_persistency/run.sh b/test/t-03-queue_persistency/run.sh new file mode 100755 index 0000000..fc08806 --- /dev/null +++ b/test/t-03-queue_persistency/run.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -e +. $(dirname ${0})/../util/lib.sh + +init + +# Add an item to the queue before starting chasquid. +go run addtoqueue.go --queue_dir=.data/queue \ + --from someone@testserver \ + --rcpt someone@testserver \ + < content + +generate_certs_for testserver + +mkdir -p .logs +chasquid -v=2 --log_dir=.logs --config_dir=config & +wait_until_ready 1025 + +# Check that the item in the queue was delivered. +wait_for_file .mail/someone@testserver + +mail_diff content .mail/someone@testserver + +success