Commit 33995fe5 authored by Jeff R. Allen's avatar Jeff R. Allen Committed by Brad Fitzpatrick

log/syslog: retry once if write fails

Implements deferred connections + single-attempt automatic
retry. Based on CL 5078042 from kuroneko.

Fixes #2264.

R=mikioh.mikioh, rsc, bradfitz
CC=golang-dev
https://golang.org/cl/6782140
parent 7de3d717
...@@ -6,7 +6,11 @@ ...@@ -6,7 +6,11 @@
// Package syslog provides a simple interface to the system log // Package syslog provides a simple interface to the system log
// service. It can send messages to the syslog daemon using UNIX // service. It can send messages to the syslog daemon using UNIX
// domain sockets, UDP, or TCP connections. // domain sockets, UDP or TCP.
//
// Only one call to Dial is necessary. On write failures,
// the syslog client will attempt to reconnect to the server
// and write again.
package syslog package syslog
import ( import (
...@@ -15,6 +19,8 @@ import ( ...@@ -15,6 +19,8 @@ import (
"log" "log"
"net" "net"
"os" "os"
"strings"
"sync"
"time" "time"
) )
...@@ -78,15 +84,10 @@ type Writer struct { ...@@ -78,15 +84,10 @@ type Writer struct {
priority Priority priority Priority
tag string tag string
hostname string hostname string
conn serverConn network string
} raddr string
type serverConn interface {
writeString(p Priority, hostname, tag, s string) (int, error)
close() error
}
type netConn struct { mu sync.Mutex // guards conn
conn net.Conn conn net.Conn
} }
...@@ -101,7 +102,7 @@ func New(priority Priority, tag string) (w *Writer, err error) { ...@@ -101,7 +102,7 @@ func New(priority Priority, tag string) (w *Writer, err error) {
// address raddr on the network net. Each write to the returned // address raddr on the network net. Each write to the returned
// writer sends a log message with the given facility, severity and // writer sends a log message with the given facility, severity and
// tag. // tag.
func Dial(network, raddr string, priority Priority, tag string) (w *Writer, err error) { func Dial(network, raddr string, priority Priority, tag string) (*Writer, error) {
if priority < 0 || priority > LOG_LOCAL7|LOG_DEBUG { if priority < 0 || priority > LOG_LOCAL7|LOG_DEBUG {
return nil, errors.New("log/syslog: invalid priority") return nil, errors.New("log/syslog: invalid priority")
} }
...@@ -109,117 +110,160 @@ func Dial(network, raddr string, priority Priority, tag string) (w *Writer, err ...@@ -109,117 +110,160 @@ func Dial(network, raddr string, priority Priority, tag string) (w *Writer, err
if tag == "" { if tag == "" {
tag = os.Args[0] tag = os.Args[0]
} }
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
var conn serverConn w := &Writer{
if network == "" { priority: priority,
conn, err = unixSyslog() tag: tag,
if hostname == "" { hostname: hostname,
hostname = "localhost" network: network,
} raddr: raddr,
} else {
var c net.Conn
c, err = net.Dial(network, raddr)
conn = netConn{c}
if hostname == "" {
hostname = c.LocalAddr().String()
}
} }
w.mu.Lock()
defer w.mu.Unlock()
err := w.connect()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return w, err
}
// connect makes a connection to the syslog server.
// It must be called with w.mu held.
func (w *Writer) connect() (err error) {
if w.conn != nil {
// ignore err from close, it makes sense to continue anyway
w.conn.Close()
w.conn = nil
}
return &Writer{priority: priority, tag: tag, hostname: hostname, conn: conn}, nil if w.network == "" {
w.conn, err = unixSyslog()
if w.hostname == "" {
w.hostname = "localhost"
}
} else {
var c net.Conn
c, err = net.Dial(w.network, w.raddr)
if err == nil {
w.conn = c
if w.hostname == "" {
w.hostname = c.LocalAddr().String()
}
}
}
return
} }
// Write sends a log message to the syslog daemon. // Write sends a log message to the syslog daemon.
func (w *Writer) Write(b []byte) (int, error) { func (w *Writer) Write(b []byte) (int, error) {
return w.writeString(w.priority, string(b)) return w.writeAndRetry(w.priority, string(b))
} }
func (w *Writer) Close() error { return w.conn.close() } // Close closes a connection to the syslog daemon.
func (w *Writer) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.conn != nil {
err := w.conn.Close()
w.conn = nil
return err
}
return nil
}
// Emerg logs a message with severity LOG_EMERG, ignoring the severity // Emerg logs a message with severity LOG_EMERG, ignoring the severity
// passed to New. // passed to New.
func (w *Writer) Emerg(m string) (err error) { func (w *Writer) Emerg(m string) (err error) {
_, err = w.writeString(LOG_EMERG, m) _, err = w.writeAndRetry(LOG_EMERG, m)
return err return err
} }
// Alert logs a message with severity LOG_ALERT, ignoring the severity // Alert logs a message with severity LOG_ALERT, ignoring the severity
// passed to New. // passed to New.
func (w *Writer) Alert(m string) (err error) { func (w *Writer) Alert(m string) (err error) {
_, err = w.writeString(LOG_ALERT, m) _, err = w.writeAndRetry(LOG_ALERT, m)
return err return err
} }
// Crit logs a message with severity LOG_CRIT, ignoring the severity // Crit logs a message with severity LOG_CRIT, ignoring the severity
// passed to New. // passed to New.
func (w *Writer) Crit(m string) (err error) { func (w *Writer) Crit(m string) (err error) {
_, err = w.writeString(LOG_CRIT, m) _, err = w.writeAndRetry(LOG_CRIT, m)
return err return err
} }
// Err logs a message with severity LOG_ERR, ignoring the severity // Err logs a message with severity LOG_ERR, ignoring the severity
// passed to New. // passed to New.
func (w *Writer) Err(m string) (err error) { func (w *Writer) Err(m string) (err error) {
_, err = w.writeString(LOG_ERR, m) _, err = w.writeAndRetry(LOG_ERR, m)
return err return err
} }
// Wanring logs a message with severity LOG_WARNING, ignoring the // Wanring logs a message with severity LOG_WARNING, ignoring the
// severity passed to New. // severity passed to New.
func (w *Writer) Warning(m string) (err error) { func (w *Writer) Warning(m string) (err error) {
_, err = w.writeString(LOG_WARNING, m) _, err = w.writeAndRetry(LOG_WARNING, m)
return err return err
} }
// Notice logs a message with severity LOG_NOTICE, ignoring the // Notice logs a message with severity LOG_NOTICE, ignoring the
// severity passed to New. // severity passed to New.
func (w *Writer) Notice(m string) (err error) { func (w *Writer) Notice(m string) (err error) {
_, err = w.writeString(LOG_NOTICE, m) _, err = w.writeAndRetry(LOG_NOTICE, m)
return err return err
} }
// Info logs a message with severity LOG_INFO, ignoring the severity // Info logs a message with severity LOG_INFO, ignoring the severity
// passed to New. // passed to New.
func (w *Writer) Info(m string) (err error) { func (w *Writer) Info(m string) (err error) {
_, err = w.writeString(LOG_INFO, m) _, err = w.writeAndRetry(LOG_INFO, m)
return err return err
} }
// Debug logs a message with severity LOG_DEBUG, ignoring the severity // Debug logs a message with severity LOG_DEBUG, ignoring the severity
// passed to New. // passed to New.
func (w *Writer) Debug(m string) (err error) { func (w *Writer) Debug(m string) (err error) {
_, err = w.writeString(LOG_DEBUG, m) _, err = w.writeAndRetry(LOG_DEBUG, m)
return err return err
} }
func (w *Writer) writeString(p Priority, s string) (int, error) { func (w *Writer) writeAndRetry(p Priority, s string) (int, error) {
return w.conn.writeString((w.priority&facilityMask)|(p&severityMask), pr := (w.priority & facilityMask) | (p & severityMask)
w.hostname, w.tag, s)
w.mu.Lock()
defer w.mu.Unlock()
if w.conn != nil {
if n, err := w.write(pr, s); err == nil {
return n, err
}
}
if err := w.connect(); err != nil {
return 0, err
}
return w.write(pr, s)
} }
// writeString: generates and writes a syslog formatted string. The // write generates and writes a syslog formatted string. The
// format is as follows: <PRI>TIMESTAMP HOSTNAME TAG[PID]: MSG // format is as follows: <PRI>TIMESTAMP HOSTNAME TAG[PID]: MSG
func (n netConn) writeString(p Priority, hostname, tag, msg string) (int, error) { func (w *Writer) write(p Priority, msg string) (int, error) {
// ensure it ends in a \n
nl := "" nl := ""
if len(msg) == 0 || msg[len(msg)-1] != '\n' { if !strings.HasSuffix(msg, "\n") {
nl = "\n" nl = "\n"
} }
timestamp := time.Now().Format(time.RFC3339) timestamp := time.Now().Format(time.RFC3339)
if _, err := fmt.Fprintf(n.conn, "<%d>%s %s %s[%d]: %s%s", p, timestamp, hostname, fmt.Fprintf(w.conn, "<%d>%s %s %s[%d]: %s%s",
tag, os.Getpid(), msg, nl); err != nil { p, timestamp, w.hostname,
return 0, err w.tag, os.Getpid(), msg, nl)
}
return len(msg), nil return len(msg), nil
} }
func (n netConn) close() error {
return n.conn.Close()
}
// NewLogger creates a log.Logger whose output is written to // NewLogger creates a log.Logger whose output is written to
// the system log service with the specified priority. The logFlag // the system log service with the specified priority. The logFlag
// argument is the flag set passed through to log.New to create // argument is the flag set passed through to log.New to create
......
...@@ -7,38 +7,150 @@ ...@@ -7,38 +7,150 @@
package syslog package syslog
import ( import (
"bufio"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"net" "net"
"os" "os"
"sync"
"testing" "testing"
"time" "time"
) )
var serverAddr string func runPktSyslog(c net.PacketConn, done chan<- string) {
func runSyslog(c net.PacketConn, done chan<- string) {
var buf [4096]byte var buf [4096]byte
var rcvd string var rcvd string
ct := 0
for { for {
var n int
var err error
c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, _, err := c.ReadFrom(buf[:]) n, _, err = c.ReadFrom(buf[:])
rcvd += string(buf[:n]) rcvd += string(buf[:n])
if err != nil { if err != nil {
if oe, ok := err.(*net.OpError); ok {
if ct < 3 && oe.Temporary() {
ct++
continue
}
}
break break
} }
} }
c.Close()
done <- rcvd done <- rcvd
} }
func startServer(done chan<- string) { var crashy = false
c, e := net.ListenPacket("udp", "127.0.0.1:0")
if e != nil { func runStreamSyslog(l net.Listener, done chan<- string) {
log.Fatalf("net.ListenPacket failed udp :0 %v", e) for {
var c net.Conn
var err error
if c, err = l.Accept(); err != nil {
fmt.Print(err)
return
}
go func(c net.Conn) {
b := bufio.NewReader(c)
for ct := 1; !crashy || ct&7 != 0; ct++ {
s, err := b.ReadString('\n')
if err != nil {
break
}
done <- s
}
c.Close()
}(c)
}
}
func startServer(n, la string, done chan<- string) (addr string) {
if n == "udp" || n == "tcp" {
la = "127.0.0.1:0"
} else {
// unix and unixgram: choose an address if none given
if la == "" {
// use ioutil.TempFile to get a name that is unique
f, err := ioutil.TempFile("", "syslogtest")
if err != nil {
log.Fatal("TempFile: ", err)
}
f.Close()
la = f.Name()
}
os.Remove(la)
}
if n == "udp" || n == "unixgram" {
l, e := net.ListenPacket(n, la)
if e != nil {
log.Fatalf("startServer failed: %v", e)
}
addr = l.LocalAddr().String()
go runPktSyslog(l, done)
} else {
l, e := net.Listen(n, la)
if e != nil {
log.Fatalf("startServer failed: %v", e)
}
addr = l.Addr().String()
go runStreamSyslog(l, done)
}
return
}
func TestWithSimulated(t *testing.T) {
msg := "Test 123"
transport := []string{"unix", "unixgram", "udp", "tcp"}
for _, tr := range transport {
done := make(chan string)
addr := startServer(tr, "", done)
s, err := Dial(tr, addr, LOG_INFO|LOG_USER, "syslog_test")
if err != nil {
t.Fatalf("Dial() failed: %v", err)
}
err = s.Info(msg)
if err != nil {
t.Fatalf("log failed: %v", err)
}
check(t, msg, <-done)
s.Close()
}
}
func TestFlap(t *testing.T) {
net := "unix"
done := make(chan string)
addr := startServer(net, "", done)
s, err := Dial(net, addr, LOG_INFO|LOG_USER, "syslog_test")
if err != nil {
t.Fatalf("Dial() failed: %v", err)
}
msg := "Moo 2"
err = s.Info(msg)
if err != nil {
t.Fatalf("log failed: %v", err)
}
check(t, msg, <-done)
// restart the server
startServer(net, addr, done)
// and try retransmitting
msg = "Moo 3"
err = s.Info(msg)
if err != nil {
t.Fatalf("log failed: %v", err)
} }
serverAddr = c.LocalAddr().String() check(t, msg, <-done)
go runSyslog(c, done)
s.Close()
} }
func TestNew(t *testing.T) { func TestNew(t *testing.T) {
...@@ -49,7 +161,8 @@ func TestNew(t *testing.T) { ...@@ -49,7 +161,8 @@ func TestNew(t *testing.T) {
// Depends on syslog daemon running, and sometimes it's not. // Depends on syslog daemon running, and sometimes it's not.
t.Skip("skipping syslog test during -short") t.Skip("skipping syslog test during -short")
} }
s, err := New(LOG_INFO|LOG_USER, "")
s, err := New(LOG_INFO|LOG_USER, "the_tag")
if err != nil { if err != nil {
t.Fatalf("New() failed: %s", err) t.Fatalf("New() failed: %s", err)
} }
...@@ -86,24 +199,15 @@ func TestDial(t *testing.T) { ...@@ -86,24 +199,15 @@ func TestDial(t *testing.T) {
l.Close() l.Close()
} }
func TestUDPDial(t *testing.T) { func check(t *testing.T, in, out string) {
done := make(chan string) tmpl := fmt.Sprintf("<%d>%%s %%s syslog_test[%%d]: %s\n", LOG_USER+LOG_INFO, in)
startServer(done)
l, err := Dial("udp", serverAddr, LOG_USER|LOG_INFO, "syslog_test")
if err != nil {
t.Fatalf("syslog.Dial() failed: %s", err)
}
msg := "udp test"
l.Info(msg)
expected := fmt.Sprintf("<%d>", LOG_USER+LOG_INFO) + "%s %s syslog_test[%d]: udp test\n"
rcvd := <-done
var parsedHostname, timestamp string
var pid int
if hostname, err := os.Hostname(); err != nil { if hostname, err := os.Hostname(); err != nil {
t.Fatalf("Error retrieving hostname") t.Error("Error retrieving hostname")
} else { } else {
if n, err := fmt.Sscanf(rcvd, expected, &timestamp, &parsedHostname, &pid); n != 3 || err != nil || hostname != parsedHostname { var parsedHostname, timestamp string
t.Fatalf("'%q', didn't match '%q' (%d, %s)", rcvd, expected, n, err) var pid int
if n, err := fmt.Sscanf(out, tmpl, &timestamp, &parsedHostname, &pid); n != 3 || err != nil || hostname != parsedHostname {
t.Errorf("Got %q, does not match template %q (%d %s)", out, tmpl, n, err)
} }
} }
} }
...@@ -126,24 +230,95 @@ func TestWrite(t *testing.T) { ...@@ -126,24 +230,95 @@ func TestWrite(t *testing.T) {
} else { } else {
for _, test := range tests { for _, test := range tests {
done := make(chan string) done := make(chan string)
startServer(done) addr := startServer("udp", "", done)
l, err := Dial("udp", serverAddr, test.pri, test.pre) l, err := Dial("udp", addr, test.pri, test.pre)
if err != nil { if err != nil {
t.Fatalf("syslog.Dial() failed: %s", err) t.Fatalf("syslog.Dial() failed: %v", err)
} }
_, err = io.WriteString(l, test.msg) _, err = io.WriteString(l, test.msg)
if err != nil { if err != nil {
t.Fatalf("WriteString() failed: %s", err) t.Fatalf("WriteString() failed: %v", err)
} }
rcvd := <-done rcvd := <-done
test.exp = fmt.Sprintf("<%d>", test.pri) + test.exp test.exp = fmt.Sprintf("<%d>", test.pri) + test.exp
var parsedHostname, timestamp string var parsedHostname, timestamp string
var pid int var pid int
if n, err := fmt.Sscanf(rcvd, test.exp, &timestamp, &parsedHostname, if n, err := fmt.Sscanf(rcvd, test.exp, &timestamp, &parsedHostname, &pid); n != 3 || err != nil || hostname != parsedHostname {
&pid); n != 3 || err != nil || hostname != parsedHostname { t.Errorf("s.Info() = '%q', didn't match '%q' (%d %s)", rcvd, test.exp, n, err)
t.Fatalf("'%q', didn't match '%q' (%d %s)", rcvd, test.exp,
n, err)
} }
} }
} }
} }
func TestConcurrentWrite(t *testing.T) {
addr := startServer("udp", "", make(chan string))
w, err := Dial("udp", addr, LOG_USER|LOG_ERR, "how's it going?")
if err != nil {
t.Fatalf("syslog.Dial() failed: %v", err)
}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
err := w.Info("test")
if err != nil {
t.Errorf("Info() failed: %v", err)
return
}
wg.Done()
}()
}
wg.Wait()
}
func TestConcurrentReconnect(t *testing.T) {
crashy = true
defer func() { crashy = false }()
net := "unix"
done := make(chan string)
addr := startServer(net, "", done)
// count all the messages arriving
count := make(chan int)
go func() {
ct := 0
for _ = range done {
ct++
// we are looking for 500 out of 1000 events
// here because lots of log messages are lost
// in buffers (kernel and/or bufio)
if ct > 500 {
break
}
}
count <- ct
}()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
w, err := Dial(net, addr, LOG_USER|LOG_ERR, "tag")
if err != nil {
t.Fatalf("syslog.Dial() failed: %v", err)
}
for i := 0; i < 100; i++ {
err := w.Info("test")
if err != nil {
t.Errorf("Info() failed: %v", err)
return
}
}
wg.Done()
}()
}
wg.Wait()
close(done)
select {
case <-count:
case <-time.After(100 * time.Millisecond):
t.Error("timeout in concurrent reconnect")
}
}
...@@ -14,18 +14,16 @@ import ( ...@@ -14,18 +14,16 @@ import (
// unixSyslog opens a connection to the syslog daemon running on the // unixSyslog opens a connection to the syslog daemon running on the
// local machine using a Unix domain socket. // local machine using a Unix domain socket.
func unixSyslog() (conn serverConn, err error) { func unixSyslog() (conn net.Conn, err error) {
logTypes := []string{"unixgram", "unix"} logTypes := []string{"unixgram", "unix"}
logPaths := []string{"/dev/log", "/var/run/syslog"} logPaths := []string{"/dev/log", "/var/run/syslog"}
var raddr string
for _, network := range logTypes { for _, network := range logTypes {
for _, path := range logPaths { for _, path := range logPaths {
raddr = path conn, err := net.Dial(network, path)
conn, err := net.Dial(network, raddr)
if err != nil { if err != nil {
continue continue
} else { } else {
return netConn{conn}, nil return conn, nil
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment