1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
21 // Timestamper is the signature for a function that takes a timestamp and
22 // return a formated string value.
23 type Timestamper func(t time.Time) string
27 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
28 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
29 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
31 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
32 // data from the stdout/stderr Reader and send to the Logger.
34 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
35 // write, and periodically flushes to a downstream writer. It supports the
36 // "Logger" and "WriteCloser" interfaces.
37 type ThrottledLogger struct {
44 stopping chan struct{}
50 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
51 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
53 // RFC3339Timestamp formats t as RFC3339NanoFixed.
54 func RFC3339Timestamp(t time.Time) string {
55 return t.Format(RFC3339NanoFixed)
58 // Write prepends a timestamp to each line of the input data and
59 // appends to the internal buffer. Each line is also logged to
60 // tl.Immediate, if tl.Immediate is not nil.
61 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
63 defer tl.Mutex.Unlock()
66 tl.buf = &bytes.Buffer{}
69 now := tl.Timestamper(time.Now().UTC())
70 sc := bufio.NewScanner(bytes.NewBuffer(p))
71 for err == nil && sc.Scan() {
72 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
73 if tl.Immediate != nil {
74 tl.Immediate.Print(out[:len(out)-1])
76 _, err = io.WriteString(tl.buf, out)
85 if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
86 // Non-blocking send. Try send a flush if it is ready to
87 // accept it. Otherwise do nothing because a flush is already
90 case tl.flush <- struct{}{}:
98 // Periodically check the current buffer; if not empty, send it on the
99 // channel to the goWriter goroutine.
100 func (tl *ThrottledLogger) flusher() {
101 ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
103 for stopping := false; !stopping; {
106 // flush tl.buf and exit the loop
112 var ready *bytes.Buffer
115 ready, tl.buf = tl.buf, &bytes.Buffer{}
118 if ready != nil && ready.Len() > 0 {
119 tl.writer.Write(ready.Bytes())
125 // Close the flusher goroutine and wait for it to complete, then close the
126 // underlying Writer.
127 func (tl *ThrottledLogger) Close() error {
135 return tl.writer.Close()
139 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
143 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
145 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
146 reader := bufio.NewReaderSize(in, MaxLogLine)
149 line, isPrefix, err := reader.ReadLine()
152 } else if err != nil {
153 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
160 if prefix == "" && suffix == "" {
163 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
166 // Set up prefix for following line
176 // NewThrottledLogger creates a new thottled logger that
177 // (a) prepends timestamps to each line
178 // (b) batches log messages and only calls the underlying Writer
179 // at most once per "crunchLogSecondsBetweenEvents" seconds.
180 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
181 tl := &ThrottledLogger{}
182 tl.flush = make(chan struct{}, 1)
183 tl.stopped = make(chan struct{})
184 tl.stopping = make(chan struct{})
186 tl.Logger = log.New(tl, "", 0)
187 tl.Timestamper = RFC3339Timestamp
192 // Log throttling rate limiting config parameters
193 var crunchLimitLogBytesPerJob int64 = 67108864
194 var crunchLogThrottleBytes int64 = 65536
195 var crunchLogThrottlePeriod time.Duration = time.Second * 60
196 var crunchLogThrottleLines int64 = 1024
197 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
198 var crunchLogBytesPerEvent int64 = 4096
199 var crunchLogSecondsBetweenEvents = time.Second
200 var crunchLogUpdatePeriod = time.Hour / 2
201 var crunchLogUpdateSize = int64(1 << 25)
203 // ArvLogWriter is an io.WriteCloser that processes each write by
204 // writing it through to another io.WriteCloser (typically a
205 // CollectionFileWriter) and creating an Arvados log entry.
206 type ArvLogWriter struct {
207 ArvClient IArvadosClient
210 writeCloser io.WriteCloser
214 logThrottleResetTime time.Time
215 logThrottleLinesSoFar int64
216 logThrottleBytesSoFar int64
217 logThrottleBytesSkipped int64
218 logThrottleIsOpen bool
219 logThrottlePartialLineNextAt time.Time
220 logThrottleFirstPartialLine bool
221 bufToFlush bytes.Buffer
222 bufFlushedAt time.Time
226 func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
227 // Write to the next writer in the chain (a file in Keep)
229 if arvlog.writeCloser != nil {
230 _, err1 = arvlog.writeCloser.Write(p)
233 // write to API after checking rate limit
236 if now.After(arvlog.logThrottleResetTime) {
237 // It has been more than throttle_period seconds since the last
238 // checkpoint; so reset the throttle
239 if arvlog.logThrottleBytesSkipped > 0 {
240 arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
243 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
244 arvlog.logThrottleBytesSoFar = 0
245 arvlog.logThrottleLinesSoFar = 0
246 arvlog.logThrottleBytesSkipped = 0
247 arvlog.logThrottleIsOpen = true
250 lines := bytes.Split(p, []byte("\n"))
252 for _, line := range lines {
253 // Short circuit the counting code if we're just going to throw
254 // away the data anyway.
255 if !arvlog.logThrottleIsOpen {
256 arvlog.logThrottleBytesSkipped += int64(len(line))
258 } else if len(line) == 0 {
263 logOpen, msg := arvlog.rateLimit(line, now)
265 arvlog.bufToFlush.WriteString(string(msg) + "\n")
269 if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
270 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
271 arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
273 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
274 "object_uuid": arvlog.UUID,
275 "event_type": arvlog.loggingStream,
276 "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
277 err2 := arvlog.ArvClient.Create("logs", lr, nil)
279 arvlog.bufToFlush = bytes.Buffer{}
280 arvlog.bufFlushedAt = now
282 if err1 != nil || err2 != nil {
283 return 0, fmt.Errorf("%s ; %s", err1, err2)
290 // Close the underlying writer
291 func (arvlog *ArvLogWriter) Close() (err error) {
292 arvlog.closing = true
293 arvlog.Write([]byte{})
294 if arvlog.writeCloser != nil {
295 err = arvlog.writeCloser.Close()
296 arvlog.writeCloser = nil
301 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
303 // Test for hard cap on total output and for log throttling. Returns whether
304 // the log line should go to output or not. Returns message if limit exceeded.
305 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
307 lineSize := int64(len(line))
309 if arvlog.logThrottleIsOpen {
310 matches := lineRegexp.FindStringSubmatch(string(line))
312 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
313 // This is a partial line.
315 if arvlog.logThrottleFirstPartialLine {
316 // Partial should be suppressed. First time this is happening for this line so provide a message instead.
317 arvlog.logThrottleFirstPartialLine = false
318 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
319 arvlog.logThrottleBytesSkipped += lineSize
320 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
321 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
322 } else if now.After(arvlog.logThrottlePartialLineNextAt) {
323 // The throttle period has passed. Update timestamp and let it through.
324 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
327 arvlog.logThrottleBytesSkipped += lineSize
331 // Not a partial line so reset.
332 arvlog.logThrottlePartialLineNextAt = time.Time{}
333 arvlog.logThrottleFirstPartialLine = true
336 arvlog.bytesLogged += lineSize
337 arvlog.logThrottleBytesSoFar += lineSize
338 arvlog.logThrottleLinesSoFar += 1
340 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
341 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
342 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
343 arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
344 arvlog.logThrottleIsOpen = false
346 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
347 remainingTime := arvlog.logThrottleResetTime.Sub(now)
348 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
349 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
350 arvlog.logThrottleIsOpen = false
352 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
353 remainingTime := arvlog.logThrottleResetTime.Sub(now)
354 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
355 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
356 arvlog.logThrottleIsOpen = false
361 if !arvlog.logThrottleIsOpen {
362 // Don't log anything if any limit has been exceeded. Just count lossage.
363 arvlog.logThrottleBytesSkipped += lineSize
367 // Yes, write to logs, but use our "rate exceeded" message
368 // instead of the log message that exceeded the limit.
369 message += " A complete log is still being written to Keep, and will be available when the job finishes."
370 return true, []byte(message)
372 return arvlog.logThrottleIsOpen, line
376 // load the rate limit discovery config parameters
377 func loadLogThrottleParams(clnt IArvadosClient) {
378 loadDuration := func(dst *time.Duration, key string) {
379 if param, err := clnt.Discovery(key); err != nil {
381 } else if d, ok := param.(float64); !ok {
384 *dst = time.Duration(d) * time.Second
387 loadInt64 := func(dst *int64, key string) {
388 if param, err := clnt.Discovery(key); err != nil {
390 } else if val, ok := param.(float64); !ok {
397 loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
398 loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
399 loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
400 loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
401 loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
402 loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
403 loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
404 loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
405 loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")