1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
22 // Timestamper is the signature for a function that takes a timestamp and
23 // return a formated string value.
24 type Timestamper func(t time.Time) string
28 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
29 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
30 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
32 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
33 // data from the stdout/stderr Reader and send to the Logger.
35 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
36 // write, and periodically flushes to a downstream writer. It supports the
37 // "Logger" and "WriteCloser" interfaces.
38 type ThrottledLogger struct {
45 stopping chan struct{}
51 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
52 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
54 // RFC3339Timestamp formats t as RFC3339NanoFixed.
55 func RFC3339Timestamp(t time.Time) string {
56 return t.Format(RFC3339NanoFixed)
59 // Write prepends a timestamp to each line of the input data and
60 // appends to the internal buffer. Each line is also logged to
61 // tl.Immediate, if tl.Immediate is not nil.
62 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
64 defer tl.Mutex.Unlock()
67 tl.buf = &bytes.Buffer{}
70 now := tl.Timestamper(time.Now().UTC())
71 sc := bufio.NewScanner(bytes.NewBuffer(p))
72 for err == nil && sc.Scan() {
73 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
74 if tl.Immediate != nil {
75 tl.Immediate.Print(out[:len(out)-1])
77 _, err = io.WriteString(tl.buf, out)
86 if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
87 // Non-blocking send. Try send a flush if it is ready to
88 // accept it. Otherwise do nothing because a flush is already
91 case tl.flush <- struct{}{}:
99 // Periodically check the current buffer; if not empty, send it on the
100 // channel to the goWriter goroutine.
101 func (tl *ThrottledLogger) flusher() {
102 ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
104 for stopping := false; !stopping; {
107 // flush tl.buf and exit the loop
113 var ready *bytes.Buffer
116 ready, tl.buf = tl.buf, &bytes.Buffer{}
119 if ready != nil && ready.Len() > 0 {
120 tl.writer.Write(ready.Bytes())
126 // Close the flusher goroutine and wait for it to complete, then close the
127 // underlying Writer.
128 func (tl *ThrottledLogger) Close() error {
136 return tl.writer.Close()
140 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
144 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
146 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
147 reader := bufio.NewReaderSize(in, MaxLogLine)
150 line, isPrefix, err := reader.ReadLine()
153 } else if err != nil {
154 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
161 if prefix == "" && suffix == "" {
164 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
167 // Set up prefix for following line
177 // NewThrottledLogger creates a new thottled logger that
178 // - prepends timestamps to each line, and
179 // - batches log messages and only calls the underlying Writer
180 // at most once per "crunchLogSecondsBetweenEvents" seconds.
181 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
182 tl := &ThrottledLogger{}
183 tl.flush = make(chan struct{}, 1)
184 tl.stopped = make(chan struct{})
185 tl.stopping = make(chan struct{})
187 tl.Logger = log.New(tl, "", 0)
188 tl.Timestamper = RFC3339Timestamp
193 // Log throttling rate limiting config parameters
194 var crunchLimitLogBytesPerJob int64 = 67108864
195 var crunchLogThrottleBytes int64 = 65536
196 var crunchLogThrottlePeriod time.Duration = time.Second * 60
197 var crunchLogThrottleLines int64 = 1024
198 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
199 var crunchLogBytesPerEvent int64 = 4096
200 var crunchLogSecondsBetweenEvents = time.Second
201 var crunchLogUpdatePeriod = time.Hour / 2
202 var crunchLogUpdateSize = int64(1 << 25)
204 // ArvLogWriter is an io.WriteCloser that processes each write by
205 // writing it through to another io.WriteCloser (typically a
206 // CollectionFileWriter) and creating an Arvados log entry.
207 type ArvLogWriter struct {
208 ArvClient IArvadosClient
211 writeCloser io.WriteCloser
215 logThrottleResetTime time.Time
216 logThrottleLinesSoFar int64
217 logThrottleBytesSoFar int64
218 logThrottleBytesSkipped int64
219 logThrottleIsOpen bool
220 logThrottlePartialLineNextAt time.Time
221 logThrottleFirstPartialLine bool
222 bufToFlush bytes.Buffer
223 bufFlushedAt time.Time
227 func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
228 // Write to the next writer in the chain (a file in Keep)
230 if arvlog.writeCloser != nil {
231 _, err1 = arvlog.writeCloser.Write(p)
234 // write to API after checking rate limit
237 if now.After(arvlog.logThrottleResetTime) {
238 // It has been more than throttle_period seconds since the last
239 // checkpoint; so reset the throttle
240 if arvlog.logThrottleBytesSkipped > 0 {
241 arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
244 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
245 arvlog.logThrottleBytesSoFar = 0
246 arvlog.logThrottleLinesSoFar = 0
247 arvlog.logThrottleBytesSkipped = 0
248 arvlog.logThrottleIsOpen = true
251 lines := bytes.Split(p, []byte("\n"))
253 for _, line := range lines {
254 // Short circuit the counting code if we're just going to throw
255 // away the data anyway.
256 if !arvlog.logThrottleIsOpen {
257 arvlog.logThrottleBytesSkipped += int64(len(line))
259 } else if len(line) == 0 {
264 logOpen, msg := arvlog.rateLimit(line, now)
266 arvlog.bufToFlush.WriteString(string(msg) + "\n")
270 if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
271 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
272 arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
274 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
275 "object_uuid": arvlog.UUID,
276 "event_type": arvlog.loggingStream,
277 "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
278 err2 := arvlog.ArvClient.Create("logs", lr, nil)
280 arvlog.bufToFlush = bytes.Buffer{}
281 arvlog.bufFlushedAt = now
283 if err1 != nil || err2 != nil {
284 return 0, fmt.Errorf("%s ; %s", err1, err2)
291 // Close the underlying writer
292 func (arvlog *ArvLogWriter) Close() (err error) {
293 arvlog.closing = true
294 arvlog.Write([]byte{})
295 if arvlog.writeCloser != nil {
296 err = arvlog.writeCloser.Close()
297 arvlog.writeCloser = nil
302 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
304 // Test for hard cap on total output and for log throttling. Returns whether
305 // the log line should go to output or not. Returns message if limit exceeded.
306 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
308 lineSize := int64(len(line))
310 if arvlog.logThrottleIsOpen {
311 matches := lineRegexp.FindStringSubmatch(string(line))
313 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
314 // This is a partial line.
316 if arvlog.logThrottleFirstPartialLine {
317 // Partial should be suppressed. First time this is happening for this line so provide a message instead.
318 arvlog.logThrottleFirstPartialLine = false
319 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
320 arvlog.logThrottleBytesSkipped += lineSize
321 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
322 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
323 } else if now.After(arvlog.logThrottlePartialLineNextAt) {
324 // The throttle period has passed. Update timestamp and let it through.
325 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
328 arvlog.logThrottleBytesSkipped += lineSize
332 // Not a partial line so reset.
333 arvlog.logThrottlePartialLineNextAt = time.Time{}
334 arvlog.logThrottleFirstPartialLine = true
337 arvlog.bytesLogged += lineSize
338 arvlog.logThrottleBytesSoFar += lineSize
339 arvlog.logThrottleLinesSoFar++
341 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
342 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
343 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
344 arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
345 arvlog.logThrottleIsOpen = false
347 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
348 remainingTime := arvlog.logThrottleResetTime.Sub(now)
349 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
350 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
351 arvlog.logThrottleIsOpen = false
353 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
354 remainingTime := arvlog.logThrottleResetTime.Sub(now)
355 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
356 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
357 arvlog.logThrottleIsOpen = false
362 if !arvlog.logThrottleIsOpen {
363 // Don't log anything if any limit has been exceeded. Just count lossage.
364 arvlog.logThrottleBytesSkipped += lineSize
368 // Yes, write to logs, but use our "rate exceeded" message
369 // instead of the log message that exceeded the limit.
370 message += " A complete log is still being written to Keep, and will be available when the job finishes."
371 return true, []byte(message)
373 return arvlog.logThrottleIsOpen, line
376 // load the rate limit discovery config parameters
377 func loadLogThrottleParams(clnt IArvadosClient) {
378 loadDuration := func(dst *time.Duration, key string) {
379 if param, err := clnt.Discovery(key); err != nil {
381 } else if d, ok := param.(float64); !ok {
384 *dst = time.Duration(d) * time.Second
387 loadInt64 := func(dst *int64, key string) {
388 if param, err := clnt.Discovery(key); err != nil {
390 } else if val, ok := param.(float64); !ok {
397 loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
398 loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
399 loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
400 loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
401 loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
402 loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
403 loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
404 loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
405 loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
409 type filterKeepstoreErrorsOnly struct {
414 func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
415 log.Printf("filterKeepstoreErrorsOnly: write %q", p)
416 f.buf = append(f.buf, p...)
418 for i := len(f.buf) - len(p); i < len(f.buf); i++ {
419 if f.buf[i] == '\n' {
420 if f.check(f.buf[start:i]) {
421 _, err := f.WriteCloser.Write(f.buf[start : i+1])
430 copy(f.buf, f.buf[start:])
431 f.buf = f.buf[:len(f.buf)-start]
436 func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
443 var m map[string]interface{}
444 err := json.Unmarshal(line, &m)
448 if m["msg"] == "request" {
451 if m["msg"] == "response" {
452 if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {