14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer. It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
45 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
46 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
48 // RFC3339Timestamp formats t as RFC3339NanoFixed.
49 func RFC3339Timestamp(t time.Time) string {
50 return t.Format(RFC3339NanoFixed)
53 // Write prepends a timestamp to each line of the input data and
54 // appends to the internal buffer. Each line is also logged to
55 // tl.Immediate, if tl.Immediate is not nil.
56 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
58 defer tl.Mutex.Unlock()
61 tl.buf = &bytes.Buffer{}
64 now := tl.Timestamper(time.Now().UTC())
65 sc := bufio.NewScanner(bytes.NewBuffer(p))
66 for err == nil && sc.Scan() {
67 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
68 if tl.Immediate != nil {
69 tl.Immediate.Print(out[:len(out)-1])
71 _, err = io.WriteString(tl.buf, out)
80 if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
81 // Non-blocking send. Try send a flush if it is ready to
82 // accept it. Otherwise do nothing because a flush is already
85 case tl.flush <- struct{}{}:
93 // Periodically check the current buffer; if not empty, send it on the
94 // channel to the goWriter goroutine.
95 func (tl *ThrottledLogger) flusher() {
96 ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
98 for stopping := false; !stopping; {
100 case _, open := <-tl.flush:
101 // if !open, will flush tl.buf and exit the loop
106 var ready *bytes.Buffer
109 ready, tl.buf = tl.buf, &bytes.Buffer{}
112 if ready != nil && ready.Len() > 0 {
113 tl.writer.Write(ready.Bytes())
119 // Close the flusher goroutine and wait for it to complete, then close the
120 // underlying Writer.
121 func (tl *ThrottledLogger) Close() error {
129 return tl.writer.Close()
133 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
137 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
139 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
140 reader := bufio.NewReaderSize(in, MaxLogLine)
143 line, isPrefix, err := reader.ReadLine()
146 } else if err != nil {
147 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
154 if prefix == "" && suffix == "" {
157 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
160 // Set up prefix for following line
170 // NewThrottledLogger creates a new thottled logger that
171 // (a) prepends timestamps to each line
172 // (b) batches log messages and only calls the underlying Writer
173 // at most once per "crunchLogSecondsBetweenEvents" seconds.
174 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
175 tl := &ThrottledLogger{}
176 tl.flush = make(chan struct{}, 1)
177 tl.stopped = make(chan struct{})
179 tl.Logger = log.New(tl, "", 0)
180 tl.Timestamper = RFC3339Timestamp
185 // Log throttling rate limiting config parameters
186 var crunchLimitLogBytesPerJob int64 = 67108864
187 var crunchLogThrottleBytes int64 = 65536
188 var crunchLogThrottlePeriod time.Duration = time.Second * 60
189 var crunchLogThrottleLines int64 = 1024
190 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
191 var crunchLogBytesPerEvent int64 = 4096
192 var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
194 // ArvLogWriter is an io.WriteCloser that processes each write by
195 // writing it through to another io.WriteCloser (typically a
196 // CollectionFileWriter) and creating an Arvados log entry.
197 type ArvLogWriter struct {
198 ArvClient IArvadosClient
201 writeCloser io.WriteCloser
205 logThrottleResetTime time.Time
206 logThrottleLinesSoFar int64
207 logThrottleBytesSoFar int64
208 logThrottleBytesSkipped int64
209 logThrottleIsOpen bool
210 logThrottlePartialLineNextAt time.Time
211 logThrottleFirstPartialLine bool
212 bufToFlush bytes.Buffer
213 bufFlushedAt time.Time
217 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
218 // Write to the next writer in the chain (a file in Keep)
220 if arvlog.writeCloser != nil {
221 _, err1 = arvlog.writeCloser.Write(p)
224 // write to API after checking rate limit
228 if now.After(arvlog.logThrottleResetTime) {
229 // It has been more than throttle_period seconds since the last
230 // checkpoint; so reset the throttle
231 if arvlog.logThrottleBytesSkipped > 0 {
232 arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
235 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
236 arvlog.logThrottleBytesSoFar = 0
237 arvlog.logThrottleLinesSoFar = 0
238 arvlog.logThrottleBytesSkipped = 0
239 arvlog.logThrottleIsOpen = true
242 lines := bytes.Split(p, []byte("\n"))
244 for _, line := range lines {
245 // Short circuit the counting code if we're just going to throw
246 // away the data anyway.
247 if !arvlog.logThrottleIsOpen {
248 arvlog.logThrottleBytesSkipped += int64(len(line))
250 } else if len(line) == 0 {
255 logOpen, msg := arvlog.rateLimit(line, now)
257 arvlog.bufToFlush.WriteString(string(msg) + "\n")
261 if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
262 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
263 arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
265 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
266 "object_uuid": arvlog.UUID,
267 "event_type": arvlog.loggingStream,
268 "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
269 err2 := arvlog.ArvClient.Create("logs", lr, nil)
271 bytesWritten = arvlog.bufToFlush.Len()
272 arvlog.bufToFlush = bytes.Buffer{}
273 arvlog.bufFlushedAt = now
275 if err1 != nil || err2 != nil {
276 return 0, fmt.Errorf("%s ; %s", err1, err2)
280 return bytesWritten, nil
283 // Close the underlying writer
284 func (arvlog *ArvLogWriter) Close() (err error) {
285 arvlog.closing = true
286 arvlog.Write([]byte{})
287 if arvlog.writeCloser != nil {
288 err = arvlog.writeCloser.Close()
289 arvlog.writeCloser = nil
294 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
296 // Test for hard cap on total output and for log throttling. Returns whether
297 // the log line should go to output or not. Returns message if limit exceeded.
298 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
300 lineSize := int64(len(line))
302 if arvlog.logThrottleIsOpen {
303 matches := lineRegexp.FindStringSubmatch(string(line))
305 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
306 // This is a partial line.
308 if arvlog.logThrottleFirstPartialLine {
309 // Partial should be suppressed. First time this is happening for this line so provide a message instead.
310 arvlog.logThrottleFirstPartialLine = false
311 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
312 arvlog.logThrottleBytesSkipped += lineSize
313 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
314 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
315 } else if now.After(arvlog.logThrottlePartialLineNextAt) {
316 // The throttle period has passed. Update timestamp and let it through.
317 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
320 arvlog.logThrottleBytesSkipped += lineSize
324 // Not a partial line so reset.
325 arvlog.logThrottlePartialLineNextAt = time.Time{}
326 arvlog.logThrottleFirstPartialLine = true
329 arvlog.bytesLogged += lineSize
330 arvlog.logThrottleBytesSoFar += lineSize
331 arvlog.logThrottleLinesSoFar += 1
333 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
334 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
335 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
336 arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
337 arvlog.logThrottleIsOpen = false
339 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
340 remainingTime := arvlog.logThrottleResetTime.Sub(now)
341 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
342 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
343 arvlog.logThrottleIsOpen = false
345 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
346 remainingTime := arvlog.logThrottleResetTime.Sub(now)
347 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
348 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
349 arvlog.logThrottleIsOpen = false
354 if !arvlog.logThrottleIsOpen {
355 // Don't log anything if any limit has been exceeded. Just count lossage.
356 arvlog.logThrottleBytesSkipped += lineSize
360 // Yes, write to logs, but use our "rate exceeded" message
361 // instead of the log message that exceeded the limit.
362 message += " A complete log is still being written to Keep, and will be available when the job finishes."
363 return true, []byte(message)
365 return arvlog.logThrottleIsOpen, line
369 // load the rate limit discovery config paramters
370 func loadLogThrottleParams(clnt IArvadosClient) {
371 param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
373 crunchLimitLogBytesPerJob = int64(param.(float64))
376 param, err = clnt.Discovery("crunchLogThrottleBytes")
378 crunchLogThrottleBytes = int64(param.(float64))
381 param, err = clnt.Discovery("crunchLogThrottlePeriod")
383 crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
386 param, err = clnt.Discovery("crunchLogThrottleLines")
388 crunchLogThrottleLines = int64(param.(float64))
391 param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
393 crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
396 param, err = clnt.Discovery("crunchLogBytesPerEvent")
398 crunchLogBytesPerEvent = int64(param.(float64))
401 param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
403 crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))