14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer. It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
45 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
46 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
48 // RFC3339Timestamp formats t as RFC3339NanoFixed.
49 func RFC3339Timestamp(t time.Time) string {
50 return t.Format(RFC3339NanoFixed)
53 // Write prepends a timestamp to each line of the input data and
54 // appends to the internal buffer. Each line is also logged to
55 // tl.Immediate, if tl.Immediate is not nil.
56 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
58 defer tl.Mutex.Unlock()
61 tl.buf = &bytes.Buffer{}
64 now := tl.Timestamper(time.Now().UTC())
65 sc := bufio.NewScanner(bytes.NewBuffer(p))
66 for err == nil && sc.Scan() {
67 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
68 if tl.Immediate != nil {
69 tl.Immediate.Print(out[:len(out)-1])
71 _, err = io.WriteString(tl.buf, out)
80 if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
81 // Non-blocking send. Try send a flush if it is ready to
82 // accept it. Otherwise do nothing because a flush is already
85 case tl.flush <- struct{}{}:
93 // Periodically check the current buffer; if not empty, send it on the
94 // channel to the goWriter goroutine.
95 func (tl *ThrottledLogger) flusher() {
96 ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
98 for stopping := false; !stopping; {
100 case _, open := <-tl.flush:
101 // if !open, will flush tl.buf and exit the loop
106 var ready *bytes.Buffer
109 ready, tl.buf = tl.buf, &bytes.Buffer{}
112 if ready != nil && ready.Len() > 0 {
113 tl.writer.Write(ready.Bytes())
119 // Close the flusher goroutine and wait for it to complete, then close the
120 // underlying Writer.
121 func (tl *ThrottledLogger) Close() error {
129 return tl.writer.Close()
133 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
137 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
139 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
140 reader := bufio.NewReaderSize(in, MaxLogLine)
143 line, isPrefix, err := reader.ReadLine()
146 } else if err != nil {
147 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
154 if prefix == "" && suffix == "" {
157 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
160 // Set up prefix for following line
170 // NewThrottledLogger creates a new thottled logger that
171 // (a) prepends timestamps to each line
172 // (b) batches log messages and only calls the underlying Writer
173 // at most once per "crunchLogSecondsBetweenEvents" seconds.
174 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
175 tl := &ThrottledLogger{}
176 tl.flush = make(chan struct{}, 1)
177 tl.stopped = make(chan struct{})
179 tl.Logger = log.New(tl, "", 0)
180 tl.Timestamper = RFC3339Timestamp
185 // Log throttling rate limiting config parameters
186 var crunchLimitLogBytesPerJob int64 = 67108864
187 var crunchLogThrottleBytes int64 = 65536
188 var crunchLogThrottlePeriod time.Duration = time.Second * 60
189 var crunchLogThrottleLines int64 = 1024
190 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
191 var crunchLogBytesPerEvent int64 = 4096
192 var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
194 // ArvLogWriter is an io.WriteCloser that processes each write by
195 // writing it through to another io.WriteCloser (typically a
196 // CollectionFileWriter) and creating an Arvados log entry.
197 type ArvLogWriter struct {
198 ArvClient IArvadosClient
201 writeCloser io.WriteCloser
205 logThrottleResetTime time.Time
206 logThrottleLinesSoFar int64
207 logThrottleBytesSoFar int64
208 logThrottleBytesSkipped int64
209 logThrottleIsOpen bool
210 logThrottlePartialLineNextAt time.Time
211 logThrottleFirstPartialLine bool
212 bufToFlush bytes.Buffer
213 bufFlushedAt time.Time
216 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
217 // Write to the next writer in the chain (a file in Keep)
219 if arvlog.writeCloser != nil {
220 _, err1 = arvlog.writeCloser.Write(p)
223 // write to API after checking rate limit
227 if now.After(arvlog.logThrottleResetTime) {
228 // It has been more than throttle_period seconds since the last
229 // checkpoint; so reset the throttle
230 if arvlog.logThrottleBytesSkipped > 0 {
231 arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
234 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
235 arvlog.logThrottleBytesSoFar = 0
236 arvlog.logThrottleLinesSoFar = 0
237 arvlog.logThrottleBytesSkipped = 0
238 arvlog.logThrottleIsOpen = true
241 lines := bytes.Split(p, []byte("\n"))
243 for _, line := range lines {
244 // Short circuit the counting code if we're just going to throw
245 // away the data anyway.
246 if !arvlog.logThrottleIsOpen {
247 arvlog.logThrottleBytesSkipped += int64(len(line))
249 } else if len(line) == 0 {
254 logOpen, msg := arvlog.rateLimit(line, now)
256 arvlog.bufToFlush.WriteString(string(msg) + "\n")
260 if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
261 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) {
263 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
264 "object_uuid": arvlog.UUID,
265 "event_type": arvlog.loggingStream,
266 "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
267 err2 := arvlog.ArvClient.Create("logs", lr, nil)
269 bytesWritten = arvlog.bufToFlush.Len()
270 arvlog.bufToFlush = bytes.Buffer{}
271 arvlog.bufFlushedAt = now
273 if err1 != nil || err2 != nil {
274 return 0, fmt.Errorf("%s ; %s", err1, err2)
278 return bytesWritten, nil
281 // Close the underlying writer
282 func (arvlog *ArvLogWriter) Close() (err error) {
283 if arvlog.writeCloser != nil {
284 err = arvlog.writeCloser.Close()
285 arvlog.writeCloser = nil
290 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
292 // Test for hard cap on total output and for log throttling. Returns whether
293 // the log line should go to output or not. Returns message if limit exceeded.
294 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
296 lineSize := int64(len(line))
298 if arvlog.logThrottleIsOpen {
299 matches := lineRegexp.FindStringSubmatch(string(line))
301 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
302 // This is a partial line.
304 if arvlog.logThrottleFirstPartialLine {
305 // Partial should be suppressed. First time this is happening for this line so provide a message instead.
306 arvlog.logThrottleFirstPartialLine = false
307 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
308 arvlog.logThrottleBytesSkipped += lineSize
309 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
310 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
311 } else if now.After(arvlog.logThrottlePartialLineNextAt) {
312 // The throttle period has passed. Update timestamp and let it through.
313 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
316 arvlog.logThrottleBytesSkipped += lineSize
320 // Not a partial line so reset.
321 arvlog.logThrottlePartialLineNextAt = time.Time{}
322 arvlog.logThrottleFirstPartialLine = true
325 arvlog.bytesLogged += lineSize
326 arvlog.logThrottleBytesSoFar += lineSize
327 arvlog.logThrottleLinesSoFar += 1
329 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
330 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
331 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
332 arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
333 arvlog.logThrottleIsOpen = false
335 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
336 remainingTime := arvlog.logThrottleResetTime.Sub(now)
337 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
338 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
339 arvlog.logThrottleIsOpen = false
341 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
342 remainingTime := arvlog.logThrottleResetTime.Sub(now)
343 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
344 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
345 arvlog.logThrottleIsOpen = false
350 if !arvlog.logThrottleIsOpen {
351 // Don't log anything if any limit has been exceeded. Just count lossage.
352 arvlog.logThrottleBytesSkipped += lineSize
356 // Yes, write to logs, but use our "rate exceeded" message
357 // instead of the log message that exceeded the limit.
358 message += " A complete log is still being written to Keep, and will be available when the job finishes."
359 return true, []byte(message)
361 return arvlog.logThrottleIsOpen, line
365 // load the rate limit discovery config paramters
366 func loadLogThrottleParams(clnt IArvadosClient) {
367 param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
369 crunchLimitLogBytesPerJob = int64(param.(float64))
372 param, err = clnt.Discovery("crunchLogThrottleBytes")
374 crunchLogThrottleBytes = int64(param.(float64))
377 param, err = clnt.Discovery("crunchLogThrottlePeriod")
379 crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
382 param, err = clnt.Discovery("crunchLogThrottleLines")
384 crunchLogThrottleLines = int64(param.(float64))
387 param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
389 crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
392 param, err = clnt.Discovery("crunchLogBytesPerEvent")
394 crunchLogBytesPerEvent = int64(param.(float64))
397 param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
399 crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))