14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer. It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
38 stopping chan struct{}
44 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
45 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
47 // RFC3339Timestamp formats t as RFC3339NanoFixed.
48 func RFC3339Timestamp(t time.Time) string {
49 return t.Format(RFC3339NanoFixed)
52 // Write prepends a timestamp to each line of the input data and
53 // appends to the internal buffer. Each line is also logged to
54 // tl.Immediate, if tl.Immediate is not nil.
55 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
57 defer tl.Mutex.Unlock()
60 tl.buf = &bytes.Buffer{}
63 now := tl.Timestamper(time.Now().UTC())
64 sc := bufio.NewScanner(bytes.NewBuffer(p))
65 for err == nil && sc.Scan() {
66 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
67 if tl.Immediate != nil {
68 tl.Immediate.Print(out[:len(out)-1])
70 _, err = io.WriteString(tl.buf, out)
81 // Periodically check the current buffer; if not empty, send it on the
82 // channel to the goWriter goroutine.
83 func (tl *ThrottledLogger) flusher() {
84 ticker := time.NewTicker(time.Second)
86 for stopping := false; !stopping; {
89 // flush tl.buf, then exit the loop
94 var ready *bytes.Buffer
97 ready, tl.buf = tl.buf, nil
100 if ready != nil && ready.Len() > 0 {
101 tl.writer.Write(ready.Bytes())
107 // Close the flusher goroutine and wait for it to complete, then close the
108 // underlying Writer.
109 func (tl *ThrottledLogger) Close() error {
117 return tl.writer.Close()
121 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
125 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
127 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
128 reader := bufio.NewReaderSize(in, MaxLogLine)
131 line, isPrefix, err := reader.ReadLine()
134 } else if err != nil {
135 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
142 if prefix == "" && suffix == "" {
145 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
148 // Set up prefix for following line
158 // NewThrottledLogger creates a new thottled logger that
159 // (a) prepends timestamps to each line
160 // (b) batches log messages and only calls the underlying Writer at most once
162 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
163 tl := &ThrottledLogger{}
164 tl.stopping = make(chan struct{})
165 tl.stopped = make(chan struct{})
167 tl.Logger = log.New(tl, "", 0)
168 tl.Timestamper = RFC3339Timestamp
173 // ArvLogWriter is an io.WriteCloser that processes each write by
174 // writing it through to another io.WriteCloser (typically a
175 // CollectionFileWriter) and creating an Arvados log entry.
176 type ArvLogWriter struct {
177 ArvClient IArvadosClient
180 writeCloser io.WriteCloser
184 logThrottleResetTime time.Time
185 logThrottleLinesSoFar int64
186 logThrottleBytesSoFar int64
187 logThrottleBytesSkipped int64
188 logThrottleIsOpen bool
189 logThrottlePartialLineLastAt time.Time
190 logThrottleFirstPartialLine bool
191 stderrBufToFlush bytes.Buffer
192 stderrFlushedAt time.Time
195 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
196 // Write to the next writer in the chain (a file in Keep)
198 if arvlog.writeCloser != nil {
199 _, err1 = arvlog.writeCloser.Write(p)
202 // write to API after checking rate limit
203 crunchLogThrottlePeriod, err2 := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
204 crunchLogBytesPerEvent, err2 := arvlog.ArvClient.Discovery("crunchLogBytesPerEvent")
205 crunchLogSecondsBetweenEvents, err2 := arvlog.ArvClient.Discovery("crunchLogSecondsBetweenEvents")
207 return 0, fmt.Errorf("%s ; %s", err1, err2)
213 if now.After(arvlog.logThrottleResetTime) {
214 // It has been more than throttle_period seconds since the last
215 // checkpoint; so reset the throttle
216 if arvlog.logThrottleBytesSkipped > 0 {
217 arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now().UTC()), arvlog.logThrottleBytesSkipped))
220 arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(int(crunchLogThrottlePeriod.(float64))))
221 arvlog.logThrottleBytesSoFar = 0
222 arvlog.logThrottleLinesSoFar = 0
223 arvlog.logThrottleBytesSkipped = 0
224 arvlog.logThrottleIsOpen = true
225 arvlog.logThrottlePartialLineLastAt = time.Time{}
226 arvlog.logThrottleFirstPartialLine = true
229 lines := bytes.Split(p, []byte("\n"))
231 for _, line := range lines {
232 // Short circuit the counting code if we're just going to throw
233 // away the data anyway.
234 if !arvlog.logThrottleIsOpen {
235 arvlog.logThrottleBytesSkipped += int64(len(line))
237 } else if len(line) == 0 {
242 _, msg, err2 := arvlog.rateLimit(line)
244 return 0, fmt.Errorf("%s ; %s", err1, err2)
246 arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
249 if arvlog.stderrBufToFlush.Len() > int(crunchLogBytesPerEvent.(float64)) ||
250 (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(int64(crunchLogSecondsBetweenEvents.(float64)))) {
252 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
253 "object_uuid": arvlog.UUID,
254 "event_type": arvlog.loggingStream,
255 "properties": map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
256 err2 := arvlog.ArvClient.Create("logs", lr, nil)
258 bytesWritten = arvlog.stderrBufToFlush.Len()
259 arvlog.stderrBufToFlush = bytes.Buffer{}
260 arvlog.stderrFlushedAt = time.Now()
262 if err1 != nil || err2 != nil {
263 return 0, fmt.Errorf("%s ; %s", err1, err2)
267 return bytesWritten, nil
270 // Close the underlying writer
271 func (arvlog *ArvLogWriter) Close() (err error) {
272 if arvlog.writeCloser != nil {
273 err = arvlog.writeCloser.Close()
274 arvlog.writeCloser = nil
279 var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
281 // Test for hard cap on total output and for log throttling. Returns whether
282 // the log line should go to output or not. Returns message if limit exceeded.
283 func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
285 lineSize := int64(len(line))
288 if arvlog.logThrottleIsOpen {
289 matches := lineRegexp.FindStringSubmatch(string(line))
291 crunchLogPartialLineThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogPartialLineThrottlePeriod")
292 crunchLimitLogBytesPerJob, err := arvlog.ArvClient.Discovery("crunchLimitLogBytesPerJob")
293 crunchLogThrottleBytes, err := arvlog.ArvClient.Discovery("crunchLogThrottleBytes")
294 crunchLogThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
295 crunchLogThrottleLines, err := arvlog.ArvClient.Discovery("crunchLogThrottleLines")
297 return false, []byte(""), err
300 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
303 if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(int(crunchLogPartialLineThrottlePeriod.(float64))))) {
304 arvlog.logThrottlePartialLineLastAt = time.Now()
311 arvlog.logThrottleLinesSoFar += 1
312 arvlog.logThrottleBytesSoFar += lineSize
313 arvlog.bytesLogged += lineSize
316 if arvlog.bytesLogged > int64(crunchLimitLogBytesPerJob.(float64)) {
317 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), int(crunchLimitLogBytesPerJob.(float64)))
318 arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
319 arvlog.logThrottleIsOpen = false
321 } else if arvlog.logThrottleBytesSoFar > int64(crunchLogThrottleBytes.(float64)) {
322 remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
323 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogThrottleBytes.(float64)), int(crunchLogThrottlePeriod.(float64)), remainingTime/time.Second)
324 arvlog.logThrottleIsOpen = false
326 } else if arvlog.logThrottleLinesSoFar > int64(crunchLogThrottleLines.(float64)) {
327 remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
328 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogThrottleLines.(float64)), int(crunchLogThrottlePeriod.(float64)), remainingTime/time.Second)
329 arvlog.logThrottleIsOpen = false
331 } else if partialLine && arvlog.logThrottleFirstPartialLine {
332 arvlog.logThrottleFirstPartialLine = false
333 message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogPartialLineThrottlePeriod.(float64)))
338 if !arvlog.logThrottleIsOpen {
339 // Don't log anything if any limit has been exceeded. Just count lossage.
340 arvlog.logThrottleBytesSkipped += lineSize
344 // Yes, write to logs, but use our "rate exceeded" message
345 // instead of the log message that exceeded the limit.
346 message += " A complete log is still being written to Keep, and will be available when the job finishes."
347 return true, []byte(message), nil
348 } else if partialLine {
349 return false, line, nil
351 return arvlog.logThrottleIsOpen, line, nil