X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ef3e45fcc338a85432c207685567385972f79ee6..a8378b8deaa2bbf9d2c154d9d9bb072538c288cc:/services/crunch-run/logging.go diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 9d97384109..5254ff671e 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "io" "log" "sync" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" ) // Timestamper is the signature for a function that takes a timestamp and @@ -18,7 +19,7 @@ type Timestamper func(t time.Time) string // Logging plumbing: // // ThrottledLogger.Logger -> ThrottledLogger.Write -> -// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter -> +// ThrottledLogger.buf -> ThrottledLogger.flusher -> // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create // // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull @@ -31,83 +32,86 @@ type ThrottledLogger struct { *log.Logger buf *bytes.Buffer sync.Mutex - writer io.WriteCloser - stop bool - flusherDone chan bool + writer io.WriteCloser + stopping chan struct{} + stopped chan struct{} Timestamper + Immediate *log.Logger } -// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision, -// because the RFC3339Nano format isn't fixed width. -const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00" +// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano. +const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" -// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed -func RFC3339Timestamp(now time.Time) string { - return now.Format(RFC3339Fixed) +// RFC3339Timestamp formats t as RFC3339NanoFixed. +func RFC3339Timestamp(t time.Time) string { + return t.Format(RFC3339NanoFixed) } -// Write to the internal buffer. Prepend a timestamp to each line of the input -// data. +// Write prepends a timestamp to each line of the input data and +// appends to the internal buffer. Each line is also logged to +// tl.Immediate, if tl.Immediate is not nil. func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { tl.Mutex.Lock() + defer tl.Mutex.Unlock() + if tl.buf == nil { tl.buf = &bytes.Buffer{} } - defer tl.Mutex.Unlock() now := tl.Timestamper(time.Now().UTC()) sc := bufio.NewScanner(bytes.NewBuffer(p)) - for sc.Scan() { - _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text()) + for err == nil && sc.Scan() { + out := fmt.Sprintf("%s %s\n", now, sc.Bytes()) + if tl.Immediate != nil { + tl.Immediate.Print(out[:len(out)-1]) + } + _, err = io.WriteString(tl.buf, out) } - return len(p), err + if err == nil { + err = sc.Err() + if err == nil { + n = len(p) + } + } + return } // Periodically check the current buffer; if not empty, send it on the // channel to the goWriter goroutine. func (tl *ThrottledLogger) flusher() { - bufchan := make(chan *bytes.Buffer) - bufterm := make(chan bool) - - // Use a separate goroutine for the actual write so that the writes are - // actually initiated closer every 1s instead of every - // 1s + (time to it takes to write). - go goWriter(tl.writer, bufchan, bufterm) - for { - if !tl.stop { - time.Sleep(1 * time.Second) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for stopping := false; !stopping; { + select { + case <-tl.stopping: + // flush tl.buf, then exit the loop + stopping = true + case <-ticker.C: } + + var ready *bytes.Buffer + tl.Mutex.Lock() - if tl.buf != nil && tl.buf.Len() > 0 { - oldbuf := tl.buf - tl.buf = nil - tl.Mutex.Unlock() - bufchan <- oldbuf - } else if tl.stop { - tl.Mutex.Unlock() - break - } else { - tl.Mutex.Unlock() - } - } - close(bufchan) - <-bufterm - tl.flusherDone <- true -} + ready, tl.buf = tl.buf, nil + tl.Mutex.Unlock() -// Receive buffers from a channel and send to the underlying Writer -func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) { - for b := range c { - writer.Write(b.Bytes()) + if ready != nil && ready.Len() > 0 { + tl.writer.Write(ready.Bytes()) + } } - t <- true + close(tl.stopped) } // Close the flusher goroutine and wait for it to complete, then close the // underlying Writer. func (tl *ThrottledLogger) Close() error { - tl.stop = true - <-tl.flusherDone + select { + case <-tl.stopping: + // already stopped + default: + close(tl.stopping) + } + <-tl.stopped return tl.writer.Close() } @@ -154,17 +158,19 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { // (b) batches log messages and only calls the underlying Writer at most once // per second. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { - alw := &ThrottledLogger{} - alw.flusherDone = make(chan bool) - alw.writer = writer - alw.Logger = log.New(alw, "", 0) - alw.Timestamper = RFC3339Timestamp - go alw.flusher() - return alw + tl := &ThrottledLogger{} + tl.stopping = make(chan struct{}) + tl.stopped = make(chan struct{}) + tl.writer = writer + tl.Logger = log.New(tl, "", 0) + tl.Timestamper = RFC3339Timestamp + go tl.flusher() + return tl } -// ArvLogWriter implements a writer that writes to each of a WriteCloser -// (typically CollectionFileWriter) and creates an API server log entry. +// ArvLogWriter is an io.WriteCloser that processes each write by +// writing it through to another io.WriteCloser (typically a +// CollectionFileWriter) and creating an Arvados log entry. type ArvLogWriter struct { ArvClient IArvadosClient UUID string @@ -180,9 +186,10 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } // write to API - lr := arvadosclient.Dict{"object_uuid": arvlog.UUID, - "event_type": arvlog.loggingStream, - "properties": map[string]string{"text": string(p)}} + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": arvlog.UUID, + "event_type": arvlog.loggingStream, + "properties": map[string]string{"text": string(p)}}} err2 := arvlog.ArvClient.Create("logs", lr, nil) if err1 != nil || err2 != nil {