14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer. It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
38 stopping chan struct{}
44 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
45 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
47 // RFC3339Timestamp formats t as RFC3339NanoFixed.
48 func RFC3339Timestamp(t time.Time) string {
49 return t.Format(RFC3339NanoFixed)
52 // Write prepends a timestamp to each line of the input data and
53 // appends to the internal buffer. Each line is also logged to
54 // tl.Immediate, if tl.Immediate is not nil.
55 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
57 defer tl.Mutex.Unlock()
60 tl.buf = &bytes.Buffer{}
63 now := tl.Timestamper(time.Now().UTC())
64 sc := bufio.NewScanner(bytes.NewBuffer(p))
65 for err == nil && sc.Scan() {
66 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
67 if tl.Immediate != nil {
68 tl.Immediate.Print(out[:len(out)-1])
70 _, err = io.WriteString(tl.buf, out)
81 // Periodically check the current buffer; if not empty, send it on the
82 // channel to the goWriter goroutine.
83 func (tl *ThrottledLogger) flusher() {
84 ticker := time.NewTicker(time.Second)
86 for stopping := false; !stopping; {
89 // flush tl.buf, then exit the loop
94 var ready *bytes.Buffer
97 ready, tl.buf = tl.buf, nil
100 if ready != nil && ready.Len() > 0 {
101 tl.writer.Write(ready.Bytes())
107 // Close the flusher goroutine and wait for it to complete, then close the
108 // underlying Writer.
109 func (tl *ThrottledLogger) Close() error {
117 return tl.writer.Close()
121 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
125 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
127 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
128 reader := bufio.NewReaderSize(in, MaxLogLine)
131 line, isPrefix, err := reader.ReadLine()
134 } else if err != nil {
135 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
142 if prefix == "" && suffix == "" {
145 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
148 // Set up prefix for following line
158 // NewThrottledLogger creates a new thottled logger that
159 // (a) prepends timestamps to each line
160 // (b) batches log messages and only calls the underlying Writer at most once
162 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
163 tl := &ThrottledLogger{}
164 tl.stopping = make(chan struct{})
165 tl.stopped = make(chan struct{})
167 tl.Logger = log.New(tl, "", 0)
168 tl.Timestamper = RFC3339Timestamp
173 // ArvLogWriter is an io.WriteCloser that processes each write by
174 // writing it through to another io.WriteCloser (typically a
175 // CollectionFileWriter) and creating an Arvados log entry.
176 type ArvLogWriter struct {
177 ArvClient IArvadosClient
180 writeCloser io.WriteCloser
184 logThrottleResetTime time.Time
185 logThrottleLinesSoFar int64
186 logThrottleBytesSoFar int64
187 logThrottleBytesSkipped int64
188 logThrottleIsOpen bool
189 logThrottlePartialLineLastAt time.Time
190 logThrottleFirstPartialLine bool
191 bufToFlush bytes.Buffer
192 bufFlushedAt time.Time
194 // rate limiting config parameters
195 crunchLimitLogBytesPerJob int64
196 crunchLogThrottleBytes int64
197 crunchLogThrottlePeriod int
198 crunchLogThrottleLines int64
199 crunchLogPartialLineThrottlePeriod int
200 crunchLogBytesPerEvent int64
201 crunchLogSecondsBetweenEvents int
204 // NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
205 func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
206 w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
208 // load the rate limit discovery config paramters
209 param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
211 w.crunchLimitLogBytesPerJob = 67108864
213 w.crunchLimitLogBytesPerJob = int64(param.(float64))
216 param, err = clnt.Discovery("crunchLogThrottleBytes")
218 w.crunchLogThrottleBytes = 65536
220 w.crunchLogThrottleBytes = int64(param.(float64))
223 param, err = clnt.Discovery("crunchLogThrottlePeriod")
225 w.crunchLogThrottlePeriod = 60
227 w.crunchLogThrottlePeriod = int(param.(float64))
230 param, err = clnt.Discovery("crunchLogThrottleLines")
232 w.crunchLogThrottleLines = 1024
234 w.crunchLogThrottleLines = int64(param.(float64))
237 param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
239 w.crunchLogPartialLineThrottlePeriod = 5
241 w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
244 param, err = clnt.Discovery("crunchLogBytesPerEvent")
246 w.crunchLogBytesPerEvent = 4096
248 w.crunchLogBytesPerEvent = int64(param.(float64))
251 param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
253 w.crunchLogSecondsBetweenEvents = 1
255 w.crunchLogSecondsBetweenEvents = int(param.(float64))
261 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
262 // Write to the next writer in the chain (a file in Keep)
264 if arvlog.writeCloser != nil {
265 _, err1 = arvlog.writeCloser.Write(p)
268 // write to API after checking rate limit
272 if now.After(arvlog.logThrottleResetTime) {
273 // It has been more than throttle_period seconds since the last
274 // checkpoint; so reset the throttle
275 if arvlog.logThrottleBytesSkipped > 0 {
276 arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
279 arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
280 arvlog.logThrottleBytesSoFar = 0
281 arvlog.logThrottleLinesSoFar = 0
282 arvlog.logThrottleBytesSkipped = 0
283 arvlog.logThrottleIsOpen = true
284 arvlog.logThrottlePartialLineLastAt = time.Time{}
285 arvlog.logThrottleFirstPartialLine = true
288 lines := bytes.Split(p, []byte("\n"))
290 for _, line := range lines {
291 // Short circuit the counting code if we're just going to throw
292 // away the data anyway.
293 if !arvlog.logThrottleIsOpen {
294 arvlog.logThrottleBytesSkipped += int64(len(line))
296 } else if len(line) == 0 {
301 logOpen, msg := arvlog.rateLimit(line, now)
302 arvlog.bufToFlush.WriteString(string(msg) + "\n")
303 arvlog.logThrottleIsOpen = logOpen
306 if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
307 (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
309 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
310 "object_uuid": arvlog.UUID,
311 "event_type": arvlog.loggingStream,
312 "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
313 err2 := arvlog.ArvClient.Create("logs", lr, nil)
315 bytesWritten = arvlog.bufToFlush.Len()
316 arvlog.bufToFlush = bytes.Buffer{}
317 arvlog.bufFlushedAt = now
319 if err1 != nil || err2 != nil {
320 return 0, fmt.Errorf("%s ; %s", err1, err2)
324 return bytesWritten, nil
327 // Close the underlying writer
328 func (arvlog *ArvLogWriter) Close() (err error) {
329 if arvlog.writeCloser != nil {
330 err = arvlog.writeCloser.Close()
331 arvlog.writeCloser = nil
336 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
338 // Test for hard cap on total output and for log throttling. Returns whether
339 // the log line should go to output or not. Returns message if limit exceeded.
340 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
342 lineSize := int64(len(line))
346 if arvlog.logThrottleIsOpen {
347 matches := lineRegexp.FindStringSubmatch(string(line))
349 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
352 if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
353 arvlog.logThrottlePartialLineLastAt = now
360 arvlog.logThrottleLinesSoFar += 1
361 arvlog.logThrottleBytesSoFar += lineSize
362 arvlog.bytesLogged += lineSize
365 if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
366 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
367 arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
368 arvlog.logThrottleIsOpen = false
370 } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
371 remainingTime := arvlog.logThrottleResetTime.Sub(now)
372 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
373 arvlog.logThrottleIsOpen = false
375 } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
376 remainingTime := arvlog.logThrottleResetTime.Sub(now)
377 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
378 arvlog.logThrottleIsOpen = false
380 } else if partialLine && arvlog.logThrottleFirstPartialLine {
381 arvlog.logThrottleFirstPartialLine = false
382 message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
387 if !arvlog.logThrottleIsOpen {
388 // Don't log anything if any limit has been exceeded. Just count lossage.
389 arvlog.logThrottleBytesSkipped += lineSize
393 // Yes, write to logs, but use our "rate exceeded" message
394 // instead of the log message that exceeded the limit.
395 message += " A complete log is still being written to Keep, and will be available when the job finishes."
396 return true, []byte(message)
397 } else if partialLine {
400 return arvlog.logThrottleIsOpen, line