X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0568e6de6a234ea25a3654ac4a68ad4b45a99ba9..24f455da9cb2bf7a3c519912abbc15292a414655:/services/crunch-run/logging.go diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 3860484747..4f8f95c9a6 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -18,10 +18,10 @@ type Timestamper func(t time.Time) string // Logging plumbing: // // ThrottledLogger.Logger -> ThrottledLogger.Write -> -// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter -> +// ThrottledLogger.buf -> ThrottledLogger.flusher -> // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create // -// For stdout/stderr CopyReaderToLog additionally runs as a goroutine to pull +// For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull // data from the stdout/stderr Reader and send to the Logger. // ThrottledLogger accepts writes, prepends a timestamp to each line of the @@ -35,72 +35,71 @@ type ThrottledLogger struct { stop bool flusherDone chan bool Timestamper + Immediate *log.Logger } -// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision, -// because the RFC3339Nano format isn't fixed width. -const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00" +// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano. +const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" -// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed -func RFC3339Timestamp(now time.Time) string { - return now.Format(RFC3339Fixed) +// RFC3339Timestamp formats t as RFC3339NanoFixed. +func RFC3339Timestamp(t time.Time) string { + return t.Format(RFC3339NanoFixed) } -// Write to the internal buffer. Prepend a timestamp to each line of the input -// data. +// Write prepends a timestamp to each line of the input data and +// appends to the internal buffer. Each line is also logged to +// tl.Immediate, if tl.Immediate is not nil. func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { tl.Mutex.Lock() + defer tl.Mutex.Unlock() + if tl.buf == nil { tl.buf = &bytes.Buffer{} } - defer tl.Mutex.Unlock() now := tl.Timestamper(time.Now().UTC()) sc := bufio.NewScanner(bytes.NewBuffer(p)) - for sc.Scan() { - _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text()) + for err == nil && sc.Scan() { + out := fmt.Sprintf("%s %s\n", now, sc.Bytes()) + if tl.Immediate != nil { + tl.Immediate.Print(out[:len(out)-1]) + } + _, err = io.WriteString(tl.buf, out) + } + if err == nil { + err = sc.Err() + if err == nil { + n = len(p) + } } - return len(p), err + return } // Periodically check the current buffer; if not empty, send it on the // channel to the goWriter goroutine. func (tl *ThrottledLogger) flusher() { - bufchan := make(chan *bytes.Buffer) - bufterm := make(chan bool) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for range ticker.C { + // We use a separate "stopping" var here to ensure we flush + // tl.buf after tl.stop becomes true. + stopping := tl.stop + + var ready *bytes.Buffer - // Use a separate goroutine for the actual write so that the writes are - // actually initiated closer every 1s instead of every - // 1s + (time to it takes to write). - go goWriter(tl.writer, bufchan, bufterm) - for { - if !tl.stop { - time.Sleep(1 * time.Second) - } tl.Mutex.Lock() - if tl.buf != nil && tl.buf.Len() > 0 { - oldbuf := tl.buf - tl.buf = nil - tl.Mutex.Unlock() - bufchan <- oldbuf - } else if tl.stop { - tl.Mutex.Unlock() - break - } else { - tl.Mutex.Unlock() + ready, tl.buf = tl.buf, nil + tl.Mutex.Unlock() + + if ready != nil && ready.Len() > 0 { + tl.writer.Write(ready.Bytes()) } - } - close(bufchan) - <-bufterm - tl.flusherDone <- true -} -// Receive buffers from a channel and send to the underlying Writer -func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) { - for b := range c { - writer.Write(b.Bytes()) + if stopping { + break + } } - t <- true + close(tl.flusherDone) } // Close the flusher goroutine and wait for it to complete, then close the @@ -116,9 +115,9 @@ const ( MaxLogLine = 1 << 12 ) -// CopyReaderToLog reads from a Reader and prints to a Logger, with long line -// splitting. -func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) { +// ReadWriteLines reads lines from a reader and writes to a Writer, with long +// line splitting. +func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { reader := bufio.NewReaderSize(in, MaxLogLine) var prefix string for { @@ -126,13 +125,19 @@ func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) { if err == io.EOF { break } else if err != nil { - logger.Print("error reading container log:", err) + writer.Write([]byte(fmt.Sprintln("error reading container log:", err))) } var suffix string if isPrefix { - suffix = "[...]" + suffix = "[...]\n" } - logger.Print(prefix, string(line), suffix) + + if prefix == "" && suffix == "" { + writer.Write(line) + } else { + writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix))) + } + // Set up prefix for following line if isPrefix { prefix = "[...]" @@ -148,17 +153,18 @@ func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) { // (b) batches log messages and only calls the underlying Writer at most once // per second. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { - alw := &ThrottledLogger{} - alw.flusherDone = make(chan bool) - alw.writer = writer - alw.Logger = log.New(alw, "", 0) - alw.Timestamper = RFC3339Timestamp - go alw.flusher() - return alw + tl := &ThrottledLogger{} + tl.flusherDone = make(chan bool) + tl.writer = writer + tl.Logger = log.New(tl, "", 0) + tl.Timestamper = RFC3339Timestamp + go tl.flusher() + return tl } -// ArvLogWriter implements a writer that writes to each of a WriteCloser -// (typically CollectionFileWriter) and creates an API server log entry. +// ArvLogWriter is an io.WriteCloser that processes each write by +// writing it through to another io.WriteCloser (typically a +// CollectionFileWriter) and creating an Arvados log entry. type ArvLogWriter struct { ArvClient IArvadosClient UUID string @@ -174,9 +180,10 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } // write to API - lr := arvadosclient.Dict{"object_uuid": arvlog.UUID, - "event_type": arvlog.loggingStream, - "properties": map[string]string{"text": string(p)}} + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": arvlog.UUID, + "event_type": arvlog.loggingStream, + "properties": map[string]string{"text": string(p)}}} err2 := arvlog.ArvClient.Create("logs", lr, nil) if err1 != nil || err2 != nil {