14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer. It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
40 stopping chan struct{}
46 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
47 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
49 // RFC3339Timestamp formats t as RFC3339NanoFixed.
50 func RFC3339Timestamp(t time.Time) string {
51 return t.Format(RFC3339NanoFixed)
54 // Write prepends a timestamp to each line of the input data and
55 // appends to the internal buffer. Each line is also logged to
56 // tl.Immediate, if tl.Immediate is not nil.
57 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
59 defer tl.Mutex.Unlock()
62 tl.buf = &bytes.Buffer{}
65 now := tl.Timestamper(time.Now().UTC())
66 sc := bufio.NewScanner(bytes.NewBuffer(p))
67 for err == nil && sc.Scan() {
68 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
69 if tl.Immediate != nil {
70 tl.Immediate.Print(out[:len(out)-1])
72 _, err = io.WriteString(tl.buf, out)
81 if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
82 // Non-blocking send. Try send a flush if it is ready to
83 // accept it. Otherwise do nothing because a flush is already
86 case tl.flush <- struct{}{}:
94 // Periodically check the current buffer; if not empty, send it on the
95 // channel to the goWriter goroutine.
96 func (tl *ThrottledLogger) flusher() {
97 ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
99 for stopping := false; !stopping; {
102 // flush tl.buf and exit the loop
108 var ready *bytes.Buffer
111 ready, tl.buf = tl.buf, &bytes.Buffer{}
114 if ready != nil && ready.Len() > 0 {
115 tl.writer.Write(ready.Bytes())
121 // Close the flusher goroutine and wait for it to complete, then close the
122 // underlying Writer.
123 func (tl *ThrottledLogger) Close() error {
131 return tl.writer.Close()
135 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
139 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
141 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
142 reader := bufio.NewReaderSize(in, MaxLogLine)
145 line, isPrefix, err := reader.ReadLine()
148 } else if err != nil {
149 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
156 if prefix == "" && suffix == "" {
159 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
162 // Set up prefix for following line
172 // NewThrottledLogger creates a new thottled logger that
173 // (a) prepends timestamps to each line
174 // (b) batches log messages and only calls the underlying Writer
175 // at most once per "crunchLogSecondsBetweenEvents" seconds.
176 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
177 tl := &ThrottledLogger{}
178 tl.flush = make(chan struct{}, 1)
179 tl.stopped = make(chan struct{})
180 tl.stopping = make(chan struct{})
182 tl.Logger = log.New(tl, "", 0)
183 tl.Timestamper = RFC3339Timestamp
188 // Log throttling rate limiting config parameters
189 var crunchLimitLogBytesPerJob int64 = 67108864
190 var crunchLogThrottleBytes int64 = 65536
191 var crunchLogThrottlePeriod time.Duration = time.Second * 60
192 var crunchLogThrottleLines int64 = 1024
193 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
194 var crunchLogBytesPerEvent int64 = 4096
195 var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
197 // ArvLogWriter is an io.WriteCloser that processes each write by
198 // writing it through to another io.WriteCloser (typically a
199 // CollectionFileWriter) and creating an Arvados log entry.
200 type ArvLogWriter struct {
201 ArvClient IArvadosClient
204 writeCloser io.WriteCloser
208 logThrottleResetTime time.Time
209 logThrottleLinesSoFar int64
210 logThrottleBytesSoFar int64
211 logThrottleBytesSkipped int64
212 logThrottleIsOpen bool
213 logThrottlePartialLineNextAt time.Time
214 logThrottleFirstPartialLine bool
215 bufToFlush bytes.Buffer
216 bufFlushedAt time.Time
220 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
221 // Write to the next writer in the chain (a file in Keep)
223 if arvlog.writeCloser != nil {
224 _, err1 = arvlog.writeCloser.Write(p)
227 // write to API after checking rate limit
231 if now.After(arvlog.logThrottleResetTime) {
232 // It has been more than throttle_period seconds since the last
233 // checkpoint; so reset the throttle
234 if arvlog.logThrottleBytesSkipped > 0 {
235 arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
238 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
239 arvlog.logThrottleBytesSoFar = 0
240 arvlog.logThrottleLinesSoFar = 0
241 arvlog.logThrottleBytesSkipped = 0
242 arvlog.logThrottleIsOpen = true
245 lines := bytes.Split(p, []byte("\n"))
247 for _, line := range lines {
248 // Short circuit the counting code if we're just going to throw
249 // away the data anyway.
250 if !arvlog.logThrottleIsOpen {
251 arvlog.logThrottleBytesSkipped += int64(len(line))
253 } else if len(line) == 0 {
258 logOpen, msg := arvlog.rateLimit(line, now)
260 arvlog.bufToFlush.WriteString(string(msg) + "\n")
264 if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
265 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
266 arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
268 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
269 "object_uuid": arvlog.UUID,
270 "event_type": arvlog.loggingStream,
271 "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
272 err2 := arvlog.ArvClient.Create("logs", lr, nil)
274 bytesWritten = arvlog.bufToFlush.Len()
275 arvlog.bufToFlush = bytes.Buffer{}
276 arvlog.bufFlushedAt = now
278 if err1 != nil || err2 != nil {
279 return 0, fmt.Errorf("%s ; %s", err1, err2)
283 return bytesWritten, nil
286 // Close the underlying writer
287 func (arvlog *ArvLogWriter) Close() (err error) {
288 arvlog.closing = true
289 arvlog.Write([]byte{})
290 if arvlog.writeCloser != nil {
291 err = arvlog.writeCloser.Close()
292 arvlog.writeCloser = nil
297 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
299 // Test for hard cap on total output and for log throttling. Returns whether
300 // the log line should go to output or not. Returns message if limit exceeded.
301 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
303 lineSize := int64(len(line))
305 if arvlog.logThrottleIsOpen {
306 matches := lineRegexp.FindStringSubmatch(string(line))
308 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
309 // This is a partial line.
311 if arvlog.logThrottleFirstPartialLine {
312 // Partial should be suppressed. First time this is happening for this line so provide a message instead.
313 arvlog.logThrottleFirstPartialLine = false
314 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
315 arvlog.logThrottleBytesSkipped += lineSize
316 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
317 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
318 } else if now.After(arvlog.logThrottlePartialLineNextAt) {
319 // The throttle period has passed. Update timestamp and let it through.
320 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
323 arvlog.logThrottleBytesSkipped += lineSize
327 // Not a partial line so reset.
328 arvlog.logThrottlePartialLineNextAt = time.Time{}
329 arvlog.logThrottleFirstPartialLine = true
332 arvlog.bytesLogged += lineSize
333 arvlog.logThrottleBytesSoFar += lineSize
334 arvlog.logThrottleLinesSoFar += 1
336 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
337 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
338 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
339 arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
340 arvlog.logThrottleIsOpen = false
342 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
343 remainingTime := arvlog.logThrottleResetTime.Sub(now)
344 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
345 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
346 arvlog.logThrottleIsOpen = false
348 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
349 remainingTime := arvlog.logThrottleResetTime.Sub(now)
350 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
351 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
352 arvlog.logThrottleIsOpen = false
357 if !arvlog.logThrottleIsOpen {
358 // Don't log anything if any limit has been exceeded. Just count lossage.
359 arvlog.logThrottleBytesSkipped += lineSize
363 // Yes, write to logs, but use our "rate exceeded" message
364 // instead of the log message that exceeded the limit.
365 message += " A complete log is still being written to Keep, and will be available when the job finishes."
366 return true, []byte(message)
368 return arvlog.logThrottleIsOpen, line
372 // load the rate limit discovery config paramters
373 func loadLogThrottleParams(clnt IArvadosClient) {
374 param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
376 crunchLimitLogBytesPerJob = int64(param.(float64))
379 param, err = clnt.Discovery("crunchLogThrottleBytes")
381 crunchLogThrottleBytes = int64(param.(float64))
384 param, err = clnt.Discovery("crunchLogThrottlePeriod")
386 crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
389 param, err = clnt.Discovery("crunchLogThrottleLines")
391 crunchLogThrottleLines = int64(param.(float64))
394 param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
396 crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
399 param, err = clnt.Discovery("crunchLogBytesPerEvent")
401 crunchLogBytesPerEvent = int64(param.(float64))
404 param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
406 crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))