author | Alberto Bertogli
<albertito@blitiri.com.ar> 2020-06-07 10:07:01 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2020-06-07 11:48:52 UTC |
parent | 41398a49ea287a3fdaccaa53a4ca55e8b3013341 |
config/config.go | +15 | -4 |
gofer.go | +6 | -0 |
reqlog/reqlog.go | +180 | -0 |
server/http.go | +67 | -4 |
server/raw.go | +21 | -3 |
test/01-be.yaml | +7 | -0 |
test/01-fe.yaml | +9 | -1 |
util/util.go | +27 | -3 |
diff --git a/config/config.go b/config/config.go index 7fb2608..2e703a6 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,8 @@ type Config struct { HTTP map[string]HTTP HTTPS map[string]HTTPS Raw map[string]Raw + + ReqLog map[string]ReqLog } type HTTP struct { @@ -29,6 +31,8 @@ type HTTP struct { DirOpts map[string]DirOpts SetHeader map[string]map[string]string + + ReqLog map[string]string } type HTTPS struct { @@ -42,10 +46,17 @@ type DirOpts struct { } type Raw struct { - Addr string - Certs string - To string - ToTLS bool `yaml:"to_tls"` + Addr string + Certs string + To string + ToTLS bool `yaml:"to_tls"` + ReqLog string +} + +type ReqLog struct { + File string + BufSize int + Format string } func (c Config) String() string { diff --git a/gofer.go b/gofer.go index bc1c82b..aa86a88 100644 --- a/gofer.go +++ b/gofer.go @@ -6,6 +6,7 @@ import ( "blitiri.com.ar/go/gofer/config" "blitiri.com.ar/go/gofer/debug" + "blitiri.com.ar/go/gofer/reqlog" "blitiri.com.ar/go/gofer/server" "blitiri.com.ar/go/log" ) @@ -18,12 +19,17 @@ var ( func main() { flag.Parse() log.Init() + log.Infof("gofer starting") conf, err := config.Load(*configfile) if err != nil { log.Fatalf("error reading config file: %v", err) } + for name, rlog := range conf.ReqLog { + reqlog.FromConfig(name, rlog) + } + for addr, https := range conf.HTTPS { go server.HTTPS(addr, https) } diff --git a/reqlog/reqlog.go b/reqlog/reqlog.go new file mode 100644 index 0000000..065b3a0 --- /dev/null +++ b/reqlog/reqlog.go @@ -0,0 +1,180 @@ +package reqlog + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "text/template" + "time" + + "blitiri.com.ar/go/gofer/config" + "blitiri.com.ar/go/gofer/trace" + "blitiri.com.ar/go/log" +) + +type Log struct { + path string + f *os.File + evs chan *Event + reopen chan bool + tmpl *template.Template + + tr *trace.EventLog +} + +type Event struct { + T time.Time + H *http.Request + R *RawRequest + + Status int + Length int64 + + Latency time.Duration +} + +type RawRequest struct { + RemoteAddr net.Addr + LocalAddr net.Addr +} + +// Common log format, used by many servers. +// https://en.wikipedia.org/wiki/Common_Log_Format +// https://httpd.apache.org/docs/2.4/logs.html#common +const commonFormat = "{{.H.RemoteAddr}} - - [{{.T.Format \"02/Jan/2006:15:04:05 -0700\"}}] \"{{.H.Method}} {{.H.URL}} {{.H.Proto}}\" {{.Status}} {{.Length}}\n" + +// Combined log format, extension of the Common Log Format, and used by a lot +// of servers (e.g. Apache). +// https://httpd.apache.org/docs/2.4/logs.html#combined +const combinedFormat = "{{.H.RemoteAddr}} - - [{{.T.Format \"02/Jan/2006:15:04:05 -0700\"}}] \"{{.H.Method}} {{.H.URL}} {{.H.Proto}}\" {{.Status}} {{.Length}} {{.H.Header.Referer|q}} {{.H.Header.User-agent|q}}\n" + +// Extension of the combined log format, prepending the virtual host. +// https://httpd.apache.org/docs/2.4/logs.html#virtualhost +const combinedVHFormat = "{{.H.Host}} " + combinedFormat + +// lighttpd log is like combined, but the virtual host is put instead of the +// ident field. +const lighttpdFormat = "{{.H.RemoteAddr}} {{.H.Host}} - [{{.T.Format \"02/Jan/2006:15:04:05 -0700\"}}] \"{{.H.Method}} {{.H.URL}} {{.H.Proto}}\" {{.Status}} {{.Length}}\n" + +// gofer format, this is the default, and can handle both raw and HTTP events. +const goferFormat = "{{.T.Format \"2006-01-02 15:04:05.000\"}}" + + "{{if .H}} {{.H.RemoteAddr}} {{.H.Proto}} {{.H.Host}} {{.H.Method}} {{.H.URL}}{{end}}" + + "{{if .R}} {{.R.RemoteAddr}} raw {{.R.LocalAddr}}{{end}}" + + " = {{.Status}} {{.Length}}b {{.Latency.Milliseconds}}ms\n" + +var knownFormats = map[string]string{ + "<common>": commonFormat, + "<combined>": combinedFormat, + "<combinedvh>": combinedVHFormat, + "<lighttpd>": lighttpdFormat, + "<gofer>": goferFormat, + "": goferFormat, +} + +func New(path string, nbuf int, format string) (*Log, error) { + var err error + h := &Log{} + + if f, ok := knownFormats[format]; ok { + format = f + } + h.tmpl = template.New(path) + h.tmpl.Funcs(template.FuncMap{ + "q": quoteString, + }) + _, err = h.tmpl.Parse(format) + if err != nil { + return nil, err + } + + switch path { + case "<stdout>": + h.f = os.Stdout + case "<stderr>": + h.f = os.Stderr + default: + // TODO: stdout/stderr (and their reopen). + h.f, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + h.path = path + } + + h.evs = make(chan *Event, nbuf) + h.reopen = make(chan bool, 1) + h.tr = trace.NewEventLog("reqlog", path) + + go h.run() + return h, nil +} + +func (h *Log) run() { + var err error + for { + select { + case e := <-h.evs: + err = h.tmpl.Execute(h.f, e) + if err != nil { + h.tr.Errorf("error logging: %v", err) + } + case <-h.reopen: + if h.path != "" { + h.f.Close() + h.f, err = os.OpenFile( + h.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + h.tr.Errorf("error reopening: %v", err) + } + } + } + } +} + +func (h *Log) Log(e *Event) { + h.evs <- e +} + +func (h *Log) Reopen() { + h.reopen <- true +} + +// Global registry for convenience. +// This is not pretty but it simplifies a lot of the handling for now. +var registry = map[string]*Log{} + +func FromConfig(name string, conf config.ReqLog) error { + h, err := New(conf.File, conf.BufSize, conf.Format) + if err != nil { + log.Fatalf("reqlog %q failed to initialize: %v", name, err) + return err + } + registry[name] = h + log.Infof("reqlog %q writing to %q", name, conf.File) + return nil +} + +func FromName(name string) *Log { + return registry[name] +} + +type ctxKeyT string + +const ctxKey = ctxKeyT("reqlog") + +func NewContext(ctx context.Context, log *Log) context.Context { + return context.WithValue(ctx, ctxKey, log) +} +func FromContext(ctx context.Context) *Log { + v := ctx.Value(ctxKey) + if v == nil { + return nil + } + return v.(*Log) +} + +func quoteString(s string) string { + return fmt.Sprintf("%q", s) +} diff --git a/server/http.go b/server/http.go index b4e8afd..90bd4e6 100644 --- a/server/http.go +++ b/server/http.go @@ -14,6 +14,7 @@ import ( "time" "blitiri.com.ar/go/gofer/config" + "blitiri.com.ar/go/gofer/reqlog" "blitiri.com.ar/go/gofer/trace" "blitiri.com.ar/go/gofer/util" "blitiri.com.ar/go/log" @@ -63,7 +64,6 @@ func httpServer(addr string, conf config.HTTP) *http.Server { // Wrap the authentication handlers. if len(conf.Auth) > 0 { authMux := http.NewServeMux() - authMux.Handle("/", srv.Handler) for path, dbPath := range conf.Auth { users, err := LoadAuthFile(dbPath) if err != nil { @@ -78,22 +78,46 @@ func httpServer(addr string, conf config.HTTP) *http.Server { log.Infof("%s auth %q -> %q", srv.Addr, path, dbPath) } + + if _, ok := conf.Auth["/"]; !ok { + authMux.Handle("/", srv.Handler) + } srv.Handler = authMux } // Extra headers. if len(conf.SetHeader) > 0 { hdrMux := http.NewServeMux() - hdrMux.Handle("/", srv.Handler) for path, extraHdrs := range conf.SetHeader { hdrMux.Handle(path, SetHeader(srv.Handler, extraHdrs)) log.Infof("%s add headers %q -> %q", srv.Addr, path, extraHdrs) } + + if _, ok := conf.SetHeader["/"]; !ok { + hdrMux.Handle("/", srv.Handler) + } srv.Handler = hdrMux } srv.Handler = WithTrace("http@"+srv.Addr, srv.Handler) + if len(conf.ReqLog) > 0 { + logMux := http.NewServeMux() + for path, logName := range conf.ReqLog { + l := reqlog.FromName(logName) + if l == nil { + log.Fatalf("unknown reqlog name %q", logName) + } + logMux.Handle(path, WithReqLog(srv.Handler, l)) + log.Infof("%s reqlog %q to %q", srv.Addr, path, logName) + } + + if _, ok := conf.ReqLog["/"]; !ok { + logMux.Handle("/", srv.Handler) + } + srv.Handler = logMux + } + return srv } @@ -170,6 +194,11 @@ func makeProxy(from string, to url.URL, conf *config.HTTP) http.Handler { // hosts. The downside is that if the destination scheme is HTTPS, // this causes issues with the TLS SNI negotiation. //req.Host = to.Host + + // Record the start time, so we can compute end to end latency. + // We use WithContext instead of Clone since a shallow copy is fine in + // this context, and faster. + *req = *req.WithContext(util.NewLatencyContext(req.Context())) } return newReverseProxy(proxy) @@ -343,6 +372,9 @@ func (t *loggingTransport) RoundTrip(req *http.Request) (*http.Response, error) if response.StatusCode >= 400 && response.StatusCode != 404 { tr.SetError() } + + reqLog(req, response.StatusCode, response.ContentLength, + util.LatencyFromContext(req.Context())) } else { // errorHandler will be invoked when err != nil, avoid double error // logging. @@ -380,6 +412,8 @@ func (p *reverseProxy) errorHandler(w http.ResponseWriter, r *http.Request, err tr.SetError() } + reqLog(r, http.StatusBadGateway, 0, util.LatencyFromContext(r.Context())) + w.WriteHeader(http.StatusBadGateway) } @@ -387,7 +421,7 @@ func (p *reverseProxy) errorHandler(w http.ResponseWriter, r *http.Request, err type statusWriter struct { http.ResponseWriter status int - length int + length int64 } func (w *statusWriter) WriteHeader(status int) { @@ -397,7 +431,7 @@ func (w *statusWriter) WriteHeader(status int) { func (w *statusWriter) Write(b []byte) (int, error) { n, err := w.ResponseWriter.Write(b) - w.length += n + w.length += int64(n) return n, err } @@ -442,12 +476,41 @@ func WithLogging(parent http.Handler) http.Handler { // Wrap the writer so we can get output information. sw := statusWriter{ResponseWriter: w} + start := time.Now() parent.ServeHTTP(&sw, r) + lat := time.Since(start) + tr.Printf("%d %s", sw.status, http.StatusText(sw.status)) tr.Printf("%d bytes", sw.length) if sw.status >= 400 && sw.status != 404 { tr.SetError() } + + reqLog(r, sw.status, sw.length, lat) + }) +} + +func WithReqLog(parent http.Handler, rl *reqlog.Log) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Associate the log with this request. Actual logging will be + // performed within the handlers (see WithLogging). + r = r.WithContext(reqlog.NewContext(r.Context(), rl)) + + parent.ServeHTTP(w, r) + }) +} + +func reqLog(r *http.Request, status int, length int64, latency time.Duration) { + rlog := reqlog.FromContext(r.Context()) + if rlog == nil { + return + } + rlog.Log(&reqlog.Event{ + T: time.Now(), + H: r, + Status: status, + Length: length, + Latency: latency, }) } diff --git a/server/raw.go b/server/raw.go index 067ab25..9fd2ff7 100644 --- a/server/raw.go +++ b/server/raw.go @@ -4,8 +4,10 @@ import ( "crypto/tls" "fmt" "net" + "time" "blitiri.com.ar/go/gofer/config" + "blitiri.com.ar/go/gofer/reqlog" "blitiri.com.ar/go/gofer/trace" "blitiri.com.ar/go/gofer/util" "blitiri.com.ar/go/log" @@ -34,6 +36,8 @@ func Raw(addr string, conf config.Raw) { log.Fatalf("Raw proxy error listening on %q: %v", addr, err) } + rlog := reqlog.FromName(conf.ReqLog) + log.Infof("Raw proxy on %q (%q)", addr, lis.Addr()) for { conn, err := lis.Accept() @@ -41,12 +45,13 @@ func Raw(addr string, conf config.Raw) { log.Fatalf("%s error accepting: %v", addr, err) } - go forward(conn, conf.To, conf.ToTLS) + go forward(conn, conf.To, conf.ToTLS, rlog) } } -func forward(src net.Conn, dstAddr string, dstTLS bool) { +func forward(src net.Conn, dstAddr string, dstTLS bool, rlog *reqlog.Log) { defer src.Close() + start := time.Now() tr := trace.New("raw", fmt.Sprintf("%s -> %s", src.LocalAddr(), dstAddr)) defer tr.Finish() @@ -71,7 +76,20 @@ func forward(src net.Conn, dstAddr string, dstTLS bool) { tr.Printf("dial complete: %v -> %v", dst.LocalAddr(), dst.RemoteAddr()) - util.BidirCopy(src, dst) + nbytes := util.BidirCopy(src, dst) + latency := time.Since(start) tr.Printf("copy complete") + if rlog != nil { + rlog.Log(&reqlog.Event{ + T: time.Now(), + R: &reqlog.RawRequest{ + RemoteAddr: src.RemoteAddr(), + LocalAddr: src.LocalAddr(), + }, + Status: 200, + Length: nbytes, + Latency: latency, + }) + } } diff --git a/test/01-be.yaml b/test/01-be.yaml index 1a2f16a..0d88f4f 100644 --- a/test/01-be.yaml +++ b/test/01-be.yaml @@ -1,6 +1,11 @@ control_addr: "127.0.0.1:8459" +reqlog: + "requests": + file: ".01-be.requests.log" + buffer: 10 + http: ":8450": dir: @@ -28,3 +33,5 @@ http: "/file": "X-My-Header": "my lovely header" + reqlog: + "/": requests diff --git a/test/01-fe.yaml b/test/01-fe.yaml index 3aeb1d3..7838e98 100644 --- a/test/01-fe.yaml +++ b/test/01-fe.yaml @@ -13,26 +13,34 @@ _proxy: &proxyroutes _redirect: &redirect "/gogo/": "https://google.com" +reqlog: + "requests": + file: ".01-fe.requests.log" http: ":8441": proxy: *proxyroutes redirect: *redirect + reqlog: + "/": "requests" https: ":8442": certs: ".certs" proxy: *proxyroutes redirect: *redirect - + reqlog: + "/": "requests" # Raw proxy to the same backend. raw: ":8445": to: "localhost:8450" + reqlog: "requests" ":8446": to: "localhost:8450" certs: ".certs" + reqlog: "requests" diff --git a/util/util.go b/util/util.go index e2e0ead..814931c 100644 --- a/util/util.go +++ b/util/util.go @@ -2,12 +2,15 @@ package util import ( + "context" "crypto/tls" "fmt" "io" "io/ioutil" "os" "path/filepath" + "sync/atomic" + "time" ) // LoadCerts loads certificates from the given directory, and returns a TLS @@ -53,16 +56,19 @@ func LoadCerts(certDir string) (*tls.Config, error) { return tlsConfig, nil } -func BidirCopy(src, dst io.ReadWriter) { +func BidirCopy(src, dst io.ReadWriter) int64 { done := make(chan bool, 2) + var total int64 go func() { - io.Copy(src, dst) + n, _ := io.Copy(src, dst) + atomic.AddInt64(&total, n) done <- true }() go func() { - io.Copy(dst, src) + n, _ := io.Copy(dst, src) + atomic.AddInt64(&total, n) done <- true }() @@ -70,4 +76,22 @@ func BidirCopy(src, dst io.ReadWriter) { // The other goroutine will remain alive, it is up to the caller to create // the conditions to complete it (e.g. by closing one of the sides). <-done + + return atomic.LoadInt64(&total) +} + +type latKeyT string + +const latKey = latKeyT("latency") + +func NewLatencyContext(ctx context.Context) context.Context { + return context.WithValue(ctx, latKey, time.Now()) +} + +func LatencyFromContext(ctx context.Context) time.Duration { + v := ctx.Value(latKey) + if v == nil { + return time.Duration(0) + } + return time.Since(v.(time.Time)) }