git » chasquid » commit aacf8ff

queue: Implement persistency

author Alberto Bertogli
2016-09-18 05:13:42 UTC
committer Alberto Bertogli
2016-10-09 23:51:04 UTC
parent 9ed30a747b8ae0cb3c229933b4ba90d461e8abb9

queue: Implement persistency

This patch makes the queue read and write items to disk.

It uses protobuf for serialization. We serialize to text format to make
manual troubleshooting easier, as the performance difference is not very
relevant for us.

chasquid.go +11 -5
chasquid_test.go +1 -0
internal/queue/queue.go +147 -45
internal/queue/queue.pb.go +149 -0
internal/queue/queue.proto +38 -0
internal/queue/queue_test.go +19 -12
test/t-03-queue_persistency/addtoqueue.go +52 -0
test/t-03-queue_persistency/config/chasquid.conf +8 -0
test/t-03-queue_persistency/content +4 -0
test/t-03-queue_persistency/hosts +1 -0
test/t-03-queue_persistency/run.sh +25 -0

diff --git a/chasquid.go b/chasquid.go
index f58337f..76decbf 100644
--- a/chasquid.go
+++ b/chasquid.go
@@ -103,6 +103,8 @@ func main() {
 	// as a remote domain (for loops, alias resolutions, etc.).
 	s.AddDomain("localhost")
 
+	s.LoadQueue(conf.DataDir + "/queue")
+
 	// Load the addresses and listeners.
 	systemdLs, err := systemd.Listeners()
 	if err != nil {
@@ -249,6 +251,15 @@ func (s *Server) AddUserDB(domain string, db *userdb.DB) {
 	s.userDBs[domain] = db
 }
 
+func (s *Server) LoadQueue(path string) {
+	q := queue.New(path, s.localDomains)
+	err := q.Load()
+	if err != nil {
+		glog.Fatalf("Error loading queue: %v", err)
+	}
+	s.queue = q
+}
+
 func (s *Server) getTLSConfig() (*tls.Config, error) {
 	var err error
 	conf := &tls.Config{}
@@ -275,11 +286,6 @@ func (s *Server) ListenAndServe() {
 		glog.Fatalf("Error loading TLS config: %v", err)
 	}
 
-	// TODO: Create the queue when creating the server?
-	// Or even before, and just give it to the server?
-	s.queue = queue.New(
-		&courier.Procmail{}, &courier.SMTP{}, s.localDomains)
-
 	for m, addrs := range s.addrs {
 		for _, addr := range addrs {
 			// Listen.
diff --git a/chasquid_test.go b/chasquid_test.go
index 243d83d..eda11bf 100644
--- a/chasquid_test.go
+++ b/chasquid_test.go
@@ -428,6 +428,7 @@ func realMain(m *testing.M) int {
 		s.AddCerts(tmpDir+"/cert.pem", tmpDir+"/key.pem")
 		s.AddAddr(smtpAddr, ModeSMTP)
 		s.AddAddr(submissionAddr, ModeSubmission)
+		s.LoadQueue(tmpDir + "/queue")
 
 		udb := userdb.New("/dev/null")
 		udb.AddUser("testuser", "testpasswd")
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index f5450cd..9553533 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -2,18 +2,26 @@
 // Accepted envelopes get put in the queue, and processed asynchronously.
 package queue
 
+// Command to generate queue.pb.go from queue.proto.
+//go:generate protoc --go_out=. -I=${GOPATH}/src -I. queue.proto
+
 import (
 	"crypto/rand"
 	"encoding/base64"
 	"fmt"
+	"os"
+	"path/filepath"
 	"sync"
 	"time"
 
 	"blitiri.com.ar/go/chasquid/internal/courier"
 	"blitiri.com.ar/go/chasquid/internal/envelope"
+	"blitiri.com.ar/go/chasquid/internal/protoio"
 	"blitiri.com.ar/go/chasquid/internal/set"
 
 	"github.com/golang/glog"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/timestamp"
 	"golang.org/x/net/trace"
 )
 
@@ -23,6 +31,13 @@ const (
 
 	// Give up sending attempts after this duration.
 	giveUpAfter = 12 * time.Hour
+
+	// Prefix for item file names.
+	// This is for convenience, versioning, and to be able to tell them apart
+	// temporary files and other cruft.
+	// It's important that it's outside the base64 space so it doesn't get
+	// generated accidentally.
+	itemFilePrefix = "m:"
 )
 
 var (
@@ -68,17 +83,45 @@ type Queue struct {
 
 	// Domains we consider local.
 	localDomains *set.String
+
+	// Path where we store the queue.
+	path string
 }
 
-// TODO: Store the queue on disk.
 // Load the queue and launch the sending loops on startup.
-func New(localC, remoteC courier.Courier, localDomains *set.String) *Queue {
+func New(path string, localDomains *set.String) *Queue {
+	os.MkdirAll(path, 0700)
+
 	return &Queue{
 		q:            map[string]*Item{},
-		localC:       localC,
-		remoteC:      remoteC,
+		localC:       &courier.Procmail{},
+		remoteC:      &courier.SMTP{},
 		localDomains: localDomains,
+		path:         path,
+	}
+}
+
+func (q *Queue) Load() error {
+	files, err := filepath.Glob(q.path + "/" + itemFilePrefix + "*")
+	if err != nil {
+		return err
+	}
+
+	for _, fname := range files {
+		item, err := ItemFromFile(fname)
+		if err != nil {
+			glog.Errorf("error loading queue item from %q: %v", fname, err)
+			continue
+		}
+
+		q.mu.Lock()
+		q.q[item.ID] = item
+		q.mu.Unlock()
+
+		go item.SendLoop(q)
 	}
+
+	return nil
 }
 
 func (q *Queue) Len() int {
@@ -94,13 +137,27 @@ func (q *Queue) Put(from string, to []string, data []byte) (string, error) {
 	}
 
 	item := &Item{
-		ID:      <-newID,
-		From:    from,
-		To:      to,
-		Data:    data,
-		Created: time.Now(),
-		Results: map[string]error{},
+		Message: Message{
+			ID:   <-newID,
+			From: from,
+			Data: data,
+		},
+		CreatedAt: time.Now(),
+	}
+
+	for _, t := range to {
+		item.Rcpt = append(item.Rcpt, &Recipient{
+			Address: t,
+			Type:    Recipient_EMAIL,
+			Status:  Recipient_PENDING,
+		})
+	}
+
+	err := item.WriteTo(q.path)
+	if err != nil {
+		return "", fmt.Errorf("failed to write item: %v", err)
 	}
+
 	q.mu.Lock()
 	q.q[item.ID] = item
 	q.mu.Unlock()
@@ -115,6 +172,12 @@ func (q *Queue) Put(from string, to []string, data []byte) (string, error) {
 
 // Remove an item from the queue.
 func (q *Queue) Remove(id string) {
+	path := fmt.Sprintf("%s/%s%s", q.path, itemFilePrefix, id)
+	err := os.Remove(path)
+	if err != nil {
+		glog.Errorf("failed to remove queue file %q: %v", path, err)
+	}
+
 	q.mu.Lock()
 	delete(q.q, id)
 	q.mu.Unlock()
@@ -124,54 +187,70 @@ func (q *Queue) Remove(id string) {
 // Register it in main().
 
 // An item in the queue.
-// This must be easily serializable, so no pointers.
 type Item struct {
-	// Item ID. Uniquely identifies this item.
-	ID string
+	// Base the item on the protobuf message.
+	// We will use this for serialization, so any fields below are NOT
+	// serialized.
+	Message
 
-	// The envelope for this item.
-	From string
-	To   []string
-	Data []byte
+	// Protect the entire item.
+	sync.Mutex
 
-	// Creation time.
-	Created time.Time
+	// Go-friendly version of Message.CreatedAtTs.
+	CreatedAt time.Time
+}
 
-	// Next attempt to send.
-	NextAttempt time.Time
+func ItemFromFile(fname string) (*Item, error) {
+	item := &Item{}
+	err := protoio.ReadTextMessage(fname, &item.Message)
+	if err != nil {
+		return nil, err
+	}
 
-	// Map of recipient -> last result of sending it.
-	Results map[string]error
-	mu      sync.Mutex
+	item.CreatedAt, err = ptypes.Timestamp(item.CreatedAtTs)
+	return item, err
 }
 
-func (item *Item) resultsFor(to string) (error, bool) {
-	item.mu.Lock()
-	defer item.mu.Unlock()
-	value, ok := item.Results[to]
-	return value, ok
+func (item *Item) WriteTo(dir string) error {
+	item.Lock()
+	defer item.Unlock()
+
+	var err error
+	item.CreatedAtTs, err = ptypes.TimestampProto(item.CreatedAt)
+	if err != nil {
+		return err
+	}
+
+	path := fmt.Sprintf("%s/%s%s", dir, itemFilePrefix, item.ID)
+
+	return protoio.WriteTextMessage(path, &item.Message, 0600)
 }
 
 func (item *Item) SendLoop(q *Queue) {
-	defer q.Remove(item.ID)
 
 	tr := trace.New("Queue", item.ID)
 	defer tr.Finish()
 	tr.LazyPrintf("from: %s", item.From)
 
 	var delay time.Duration
-	for time.Since(item.Created) < giveUpAfter {
+	for time.Since(item.CreatedAt) < giveUpAfter {
 		// Send to all recipients that are still pending.
 		var wg sync.WaitGroup
-		for _, to := range item.To {
-			if err, ok := item.resultsFor(to); ok && err == nil {
-				// Successful send for this recipient, nothing to do.
+		for _, rcpt := range item.Rcpt {
+			item.Lock()
+			status := rcpt.Status
+			item.Unlock()
+
+			if status != Recipient_PENDING {
 				continue
 			}
 
 			wg.Add(1)
-			go func(to string) {
+			go func(rcpt *Recipient, oldStatus Recipient_Status) {
 				defer wg.Done()
+				// TODO: Different types of recipients.
+				to := rcpt.Address
+
 				tr.LazyPrintf("%s sending", to)
 
 				var err error
@@ -183,9 +262,6 @@ func (item *Item) SendLoop(q *Queue) {
 				} else {
 					err = q.remoteC.Deliver(item.From, to, item.Data)
 				}
-				item.mu.Lock()
-				item.Results[to] = err
-				item.mu.Unlock()
 
 				if err != nil {
 					// TODO: Local deliveries should not be retried, if they
@@ -197,21 +273,39 @@ func (item *Item) SendLoop(q *Queue) {
 				} else {
 					tr.LazyPrintf("%s successful", to)
 					glog.Infof("%s  -> %q sent", item.ID, to)
+
+					status = Recipient_SENT
 				}
-			}(to)
+
+				// Update + write on status change.
+				if oldStatus != status {
+					item.Lock()
+					rcpt.Status = status
+					item.Unlock()
+
+					err = item.WriteTo(q.path)
+					if err != nil {
+						tr.LazyPrintf("failed to write: %v", err)
+						glog.Errorf("%s failed to write: %v", item.ID, err)
+					}
+				}
+			}(rcpt, status)
 		}
 		wg.Wait()
 
-		successful := 0
-		for _, to := range item.To {
-			if err, ok := item.resultsFor(to); ok && err == nil {
-				successful++
+		pending := 0
+		for _, rcpt := range item.Rcpt {
+			if rcpt.Status == Recipient_PENDING {
+				pending++
 			}
 		}
 
-		if successful == len(item.To) {
+		if pending == 0 {
 			// Successfully sent to all recipients.
+			tr.LazyPrintf("all successful")
 			glog.Infof("%s all successful", item.ID)
+
+			q.Remove(item.ID)
 			return
 		}
 
@@ -219,11 +313,13 @@ func (item *Item) SendLoop(q *Queue) {
 		// that some of the messages have been delayed.
 
 		delay = nextDelay(delay)
+		tr.LazyPrintf("waiting for %v", delay)
 		glog.Infof("%s waiting for %v", item.ID, delay)
 		time.Sleep(delay)
 	}
 
-	// TODO: Send a notification message for the recipients we failed to send.
+	// TODO: Send a notification message for the recipients we failed to send,
+	// remove item from the queue, and remove from disk.
 }
 
 func nextDelay(last time.Duration) time.Duration {
@@ -238,3 +334,9 @@ func nextDelay(last time.Duration) time.Duration {
 		return 20 * time.Minute
 	}
 }
+
+func timestampNow() *timestamp.Timestamp {
+	now := time.Now()
+	ts, _ := ptypes.TimestampProto(now)
+	return ts
+}
diff --git a/internal/queue/queue.pb.go b/internal/queue/queue.pb.go
new file mode 100644
index 0000000..98fc7a1
--- /dev/null
+++ b/internal/queue/queue.pb.go
@@ -0,0 +1,149 @@
+// Code generated by protoc-gen-go.
+// source: queue.proto
+// DO NOT EDIT!
+
+/*
+Package queue is a generated protocol buffer package.
+
+It is generated from these files:
+	queue.proto
+
+It has these top-level messages:
+	Message
+	Recipient
+*/
+package queue
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+import google_protobuf "github.com/golang/protobuf/ptypes/timestamp"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Recipient_Type int32
+
+const (
+	Recipient_EMAIL Recipient_Type = 0
+)
+
+var Recipient_Type_name = map[int32]string{
+	0: "EMAIL",
+}
+var Recipient_Type_value = map[string]int32{
+	"EMAIL": 0,
+}
+
+func (x Recipient_Type) String() string {
+	return proto.EnumName(Recipient_Type_name, int32(x))
+}
+func (Recipient_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 0} }
+
+type Recipient_Status int32
+
+const (
+	Recipient_PENDING Recipient_Status = 0
+	Recipient_SENT    Recipient_Status = 1
+	Recipient_FAILED  Recipient_Status = 2
+)
+
+var Recipient_Status_name = map[int32]string{
+	0: "PENDING",
+	1: "SENT",
+	2: "FAILED",
+}
+var Recipient_Status_value = map[string]int32{
+	"PENDING": 0,
+	"SENT":    1,
+	"FAILED":  2,
+}
+
+func (x Recipient_Status) String() string {
+	return proto.EnumName(Recipient_Status_name, int32(x))
+}
+func (Recipient_Status) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 1} }
+
+type Message struct {
+	// Message ID. Uniquely identifies this message, it is used for
+	// auditing and troubleshooting.
+	ID string `protobuf:"bytes,1,opt,name=ID,json=iD" json:"ID,omitempty"`
+	// The envelope for this message.
+	From string       `protobuf:"bytes,2,opt,name=from" json:"from,omitempty"`
+	Rcpt []*Recipient `protobuf:"bytes,3,rep,name=rcpt" json:"rcpt,omitempty"`
+	Data []byte       `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
+	// Creation timestamp.
+	CreatedAtTs *google_protobuf.Timestamp `protobuf:"bytes,5,opt,name=created_at_ts,json=createdAtTs" json:"created_at_ts,omitempty"`
+}
+
+func (m *Message) Reset()                    { *m = Message{} }
+func (m *Message) String() string            { return proto.CompactTextString(m) }
+func (*Message) ProtoMessage()               {}
+func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+func (m *Message) GetRcpt() []*Recipient {
+	if m != nil {
+		return m.Rcpt
+	}
+	return nil
+}
+
+func (m *Message) GetCreatedAtTs() *google_protobuf.Timestamp {
+	if m != nil {
+		return m.CreatedAtTs
+	}
+	return nil
+}
+
+type Recipient struct {
+	Address string           `protobuf:"bytes,1,opt,name=address" json:"address,omitempty"`
+	Type    Recipient_Type   `protobuf:"varint,2,opt,name=type,enum=queue.Recipient_Type" json:"type,omitempty"`
+	Status  Recipient_Status `protobuf:"varint,3,opt,name=status,enum=queue.Recipient_Status" json:"status,omitempty"`
+}
+
+func (m *Recipient) Reset()                    { *m = Recipient{} }
+func (m *Recipient) String() string            { return proto.CompactTextString(m) }
+func (*Recipient) ProtoMessage()               {}
+func (*Recipient) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func init() {
+	proto.RegisterType((*Message)(nil), "queue.Message")
+	proto.RegisterType((*Recipient)(nil), "queue.Recipient")
+	proto.RegisterEnum("queue.Recipient_Type", Recipient_Type_name, Recipient_Type_value)
+	proto.RegisterEnum("queue.Recipient_Status", Recipient_Status_name, Recipient_Status_value)
+}
+
+func init() { proto.RegisterFile("queue.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+	// 329 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x8f, 0x41, 0x6b, 0xab, 0x40,
+	0x14, 0x85, 0xa3, 0x31, 0xe6, 0xe5, 0xfa, 0x5e, 0xf0, 0x0d, 0x94, 0x4a, 0x56, 0x41, 0xba, 0x48,
+	0x29, 0x28, 0xa4, 0xcb, 0x42, 0x21, 0xa0, 0x2d, 0x81, 0x26, 0x94, 0x89, 0xfb, 0x30, 0xd1, 0x89,
+	0x15, 0x62, 0xc6, 0x3a, 0xd7, 0x45, 0x7f, 0x51, 0x7f, 0x47, 0xff, 0x59, 0xc7, 0x89, 0x69, 0xa1,
+	0xdd, 0xdd, 0x3b, 0xe7, 0x9c, 0xb9, 0xdf, 0x01, 0xe7, 0xb5, 0xe1, 0x0d, 0x0f, 0xaa, 0x5a, 0xa0,
+	0x20, 0x03, 0xbd, 0x4c, 0xee, 0xf2, 0x02, 0x5f, 0x9a, 0x5d, 0x90, 0x8a, 0x32, 0xcc, 0xc5, 0x81,
+	0x1d, 0xf3, 0x50, 0xeb, 0xbb, 0x66, 0x1f, 0x56, 0xf8, 0x56, 0x71, 0x19, 0x62, 0x51, 0x72, 0x89,
+	0xac, 0xac, 0xbe, 0xa7, 0xd3, 0x1f, 0xfe, 0xbb, 0x01, 0xc3, 0x15, 0x97, 0x92, 0xe5, 0x9c, 0x8c,
+	0xc1, 0x5c, 0x46, 0x9e, 0x31, 0x35, 0x66, 0x23, 0x6a, 0x16, 0x11, 0x21, 0x60, 0xed, 0x6b, 0x51,
+	0x7a, 0xa6, 0x7e, 0xd1, 0x33, 0xb9, 0x02, 0xab, 0x4e, 0x2b, 0xf4, 0xfa, 0xd3, 0xfe, 0xcc, 0x99,
+	0xbb, 0xc1, 0x89, 0x87, 0xf2, 0xb4, 0xa8, 0x0a, 0x7e, 0x44, 0xaa, 0xd5, 0x36, 0x99, 0x31, 0x64,
+	0x9e, 0xa5, 0x92, 0x7f, 0xa9, 0x9e, 0xc9, 0x3d, 0xfc, 0x4b, 0x6b, 0xce, 0x90, 0x67, 0x5b, 0x86,
+	0x5b, 0x94, 0xde, 0x40, 0x89, 0xce, 0x7c, 0x12, 0xe4, 0x42, 0xe4, 0x87, 0xae, 0x93, 0x62, 0x0e,
+	0x92, 0x33, 0x22, 0x75, 0xba, 0xc0, 0x02, 0x13, 0xe9, 0x7f, 0x18, 0x30, 0xfa, 0xba, 0x43, 0x3c,
+	0x18, 0xb2, 0x2c, 0xab, 0x15, 0x79, 0x07, 0x7c, 0x5e, 0xc9, 0x35, 0x58, 0x6d, 0x69, 0x4d, 0x3d,
+	0x9e, 0x5f, 0xfc, 0x24, 0x0c, 0x12, 0x25, 0x52, 0x6d, 0x21, 0x21, 0xd8, 0xea, 0x10, 0x36, 0x52,
+	0xd5, 0x69, 0xcd, 0x97, 0xbf, 0xcc, 0x1b, 0x2d, 0xd3, 0xce, 0xe6, 0xff, 0x07, 0xab, 0x8d, 0x93,
+	0x11, 0x0c, 0xe2, 0xd5, 0x62, 0xf9, 0xe4, 0xf6, 0xfc, 0x1b, 0xb0, 0x4f, 0x26, 0xe2, 0xc0, 0xf0,
+	0x39, 0x5e, 0x47, 0xcb, 0xf5, 0xa3, 0xdb, 0x23, 0x7f, 0xc0, 0xda, 0xc4, 0xeb, 0xc4, 0x35, 0x08,
+	0x80, 0xfd, 0xa0, 0xac, 0x71, 0xe4, 0x9a, 0x3b, 0x5b, 0x97, 0xbc, 0xfd, 0x0c, 0x00, 0x00, 0xff,
+	0xff, 0x64, 0x3a, 0x92, 0xc1, 0xc7, 0x01, 0x00, 0x00,
+}
diff --git a/internal/queue/queue.proto b/internal/queue/queue.proto
new file mode 100644
index 0000000..d01b8e7
--- /dev/null
+++ b/internal/queue/queue.proto
@@ -0,0 +1,38 @@
+
+syntax = "proto3";
+
+package queue;
+
+import "github.com/golang/protobuf/ptypes/timestamp/timestamp.proto";
+
+
+message Message {
+	// Message ID. Uniquely identifies this message, it is used for
+	// auditing and troubleshooting.
+	string ID = 1;
+
+	// The envelope for this message.
+	string from = 2;
+	repeated Recipient rcpt = 3;
+	bytes data = 4;
+
+	// Creation timestamp.
+	google.protobuf.Timestamp created_at_ts = 5;
+}
+
+message Recipient {
+	string address = 1;
+
+	enum Type {
+		EMAIL = 0;
+	}
+	Type type = 2;
+
+	enum Status {
+		PENDING = 0;
+		SENT = 1;
+		FAILED = 2;
+	}
+	Status status = 3;
+}
+
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index a1ac39d..7f4571f 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -2,6 +2,7 @@ package queue
 
 import (
 	"bytes"
+	"fmt"
 	"sync"
 	"testing"
 	"time"
@@ -57,7 +58,9 @@ func newTestCourier() *TestCourier {
 func TestBasic(t *testing.T) {
 	localC := newTestCourier()
 	remoteC := newTestCourier()
-	q := New(localC, remoteC, set.NewString("loco"))
+	q := New("/tmp/queue_test", set.NewString("loco"))
+	q.localC = localC
+	q.remoteC = remoteC
 
 	localC.wg.Add(2)
 	remoteC.wg.Add(1)
@@ -96,35 +99,39 @@ func TestBasic(t *testing.T) {
 }
 
 func TestFullQueue(t *testing.T) {
-	localC := newChanCourier()
-	remoteC := newChanCourier()
-	q := New(localC, remoteC, set.NewString())
+	q := New("/tmp/queue_test", set.NewString())
 
 	// Force-insert maxQueueSize items in the queue.
 	oneID := ""
 	for i := 0; i < maxQueueSize; i++ {
 		item := &Item{
-			ID:      <-newID,
-			From:    "from",
-			To:      []string{"to"},
-			Data:    []byte("data"),
-			Created: time.Now(),
-			Results: map[string]error{},
+			Message: Message{
+				ID:   <-newID,
+				From: fmt.Sprintf("from-%d", i),
+				Rcpt: []*Recipient{{"to", Recipient_EMAIL, Recipient_PENDING}},
+				Data: []byte("data"),
+			},
+			CreatedAt: time.Now(),
 		}
 		q.q[item.ID] = item
 		oneID = item.ID
 	}
 
 	// This one should fail due to the queue being too big.
-	id, err := q.Put("from", []string{"to"}, []byte("data"))
+	id, err := q.Put("from", []string{"to"}, []byte("data-qf"))
 	if err != errQueueFull {
 		t.Errorf("Not failed as expected: %v - %v", id, err)
 	}
 
 	// Remove one, and try again: it should succeed.
+	// Write it first so we don't get complaints about the file not existing
+	// (as we did not all the items properly).
+	q.q[oneID].WriteTo(q.path)
 	q.Remove(oneID)
-	_, err = q.Put("from", []string{"to"}, []byte("data"))
+
+	id, err = q.Put("from", []string{"to"}, []byte("data"))
 	if err != nil {
 		t.Errorf("Put: %v", err)
 	}
+	q.Remove(id)
 }
diff --git a/test/t-03-queue_persistency/addtoqueue.go b/test/t-03-queue_persistency/addtoqueue.go
new file mode 100644
index 0000000..bfe7af8
--- /dev/null
+++ b/test/t-03-queue_persistency/addtoqueue.go
@@ -0,0 +1,52 @@
+// addtoqueue is a test helper which adds a queue item directly to the queue
+// directory, behind chasquid's back.
+//
+// Note that chasquid does NOT support this, we do it before starting up the
+// daemon for testing purposes only.
+package main
+
+import (
+	"flag"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"time"
+
+	"blitiri.com.ar/go/chasquid/internal/queue"
+)
+
+var (
+	queueDir = flag.String("queue_dir", ".queue", "queue directory")
+	id       = flag.String("id", "mid1234", "Message ID")
+	from     = flag.String("from", "from", "Mail from")
+	rcpt     = flag.String("rcpt", "rcpt", "Rcpt to")
+)
+
+func main() {
+	flag.Parse()
+
+	data, err := ioutil.ReadAll(os.Stdin)
+	if err != nil {
+		fmt.Printf("error reading data: %v\n", err)
+		os.Exit(1)
+	}
+
+	item := &queue.Item{
+		Message: queue.Message{
+			ID:   *id,
+			From: *from,
+			Rcpt: []*queue.Recipient{
+				{*rcpt, queue.Recipient_EMAIL, queue.Recipient_PENDING},
+			},
+			Data: data,
+		},
+		CreatedAt: time.Now(),
+	}
+
+	os.MkdirAll(*queueDir, 0700)
+	err = item.WriteTo(*queueDir)
+	if err != nil {
+		fmt.Printf("error writing item: %v\n", err)
+		os.Exit(1)
+	}
+}
diff --git a/test/t-03-queue_persistency/config/chasquid.conf b/test/t-03-queue_persistency/config/chasquid.conf
new file mode 100644
index 0000000..623bab4
--- /dev/null
+++ b/test/t-03-queue_persistency/config/chasquid.conf
@@ -0,0 +1,8 @@
+smtp_address: ":1025"
+submission_address: ":1587"
+monitoring_address: ":1099"
+
+mail_delivery_agent_bin: "test-mda"
+mail_delivery_agent_args: "%user%@%domain%"
+
+data_dir: "../.data"
diff --git a/test/t-03-queue_persistency/content b/test/t-03-queue_persistency/content
new file mode 100644
index 0000000..76a8b16
--- /dev/null
+++ b/test/t-03-queue_persistency/content
@@ -0,0 +1,4 @@
+Subject: Prueba desde el test
+
+Crece desde el test el futuro
+Crece desde el test
diff --git a/test/t-03-queue_persistency/hosts b/test/t-03-queue_persistency/hosts
new file mode 100644
index 0000000..2b9b623
--- /dev/null
+++ b/test/t-03-queue_persistency/hosts
@@ -0,0 +1 @@
+testserver localhost
diff --git a/test/t-03-queue_persistency/run.sh b/test/t-03-queue_persistency/run.sh
new file mode 100755
index 0000000..fc08806
--- /dev/null
+++ b/test/t-03-queue_persistency/run.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+
+set -e
+. $(dirname ${0})/../util/lib.sh
+
+init
+
+# Add an item to the queue before starting chasquid.
+go run addtoqueue.go --queue_dir=.data/queue \
+		--from someone@testserver \
+		--rcpt someone@testserver \
+		< content
+
+generate_certs_for testserver
+
+mkdir -p .logs
+chasquid -v=2 --log_dir=.logs --config_dir=config &
+wait_until_ready 1025
+
+# Check that the item in the queue was delivered.
+wait_for_file .mail/someone@testserver
+
+mail_diff content .mail/someone@testserver
+
+success