14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer. It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
44 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
45 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
47 // RFC3339Timestamp formats t as RFC3339NanoFixed.
48 func RFC3339Timestamp(t time.Time) string {
49 return t.Format(RFC3339NanoFixed)
52 // Write prepends a timestamp to each line of the input data and
53 // appends to the internal buffer. Each line is also logged to
54 // tl.Immediate, if tl.Immediate is not nil.
55 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
57 defer tl.Mutex.Unlock()
60 tl.buf = &bytes.Buffer{}
63 now := tl.Timestamper(time.Now().UTC())
64 sc := bufio.NewScanner(bytes.NewBuffer(p))
65 for err == nil && sc.Scan() {
66 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
67 if tl.Immediate != nil {
68 tl.Immediate.Print(out[:len(out)-1])
70 _, err = io.WriteString(tl.buf, out)
81 // Periodically check the current buffer; if not empty, send it on the
82 // channel to the goWriter goroutine.
83 func (tl *ThrottledLogger) flusher() {
84 ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
86 for stopping := false; !stopping; {
88 case _, open := <-tl.flush:
89 // if !open, flush tl.buf and exit the loop
94 var ready *bytes.Buffer
97 ready, tl.buf = tl.buf, nil
100 if ready != nil && ready.Len() > 0 {
101 tl.writer.Write(ready.Bytes())
107 // Close the flusher goroutine and wait for it to complete, then close the
108 // underlying Writer.
109 func (tl *ThrottledLogger) Close() error {
117 return tl.writer.Close()
121 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
125 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
127 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
128 reader := bufio.NewReaderSize(in, MaxLogLine)
131 line, isPrefix, err := reader.ReadLine()
134 } else if err != nil {
135 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
142 if prefix == "" && suffix == "" {
145 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
148 // Set up prefix for following line
158 // NewThrottledLogger creates a new thottled logger that
159 // (a) prepends timestamps to each line
160 // (b) batches log messages and only calls the underlying Writer at most once
162 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
163 tl := &ThrottledLogger{}
164 tl.flush = make(chan struct{}, 1)
165 tl.stopped = make(chan struct{})
167 tl.Logger = log.New(tl, "", 0)
168 tl.Timestamper = RFC3339Timestamp
173 // Log throttling rate limiting config parameters
174 var crunchLimitLogBytesPerJob int64
175 var crunchLogThrottleBytes int64
176 var crunchLogThrottlePeriod int
177 var crunchLogThrottleLines int64
178 var crunchLogPartialLineThrottlePeriod int
179 var crunchLogBytesPerEvent int64
180 var crunchLogSecondsBetweenEvents int
182 // ArvLogWriter is an io.WriteCloser that processes each write by
183 // writing it through to another io.WriteCloser (typically a
184 // CollectionFileWriter) and creating an Arvados log entry.
185 type ArvLogWriter struct {
186 ArvClient IArvadosClient
189 writeCloser io.WriteCloser
193 logThrottleResetTime time.Time
194 logThrottleLinesSoFar int64
195 logThrottleBytesSoFar int64
196 logThrottleBytesSkipped int64
197 logThrottleIsOpen bool
198 logThrottlePartialLineLastAt time.Time
199 logThrottleFirstPartialLine bool
200 bufToFlush bytes.Buffer
201 bufFlushedAt time.Time
204 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
205 // Write to the next writer in the chain (a file in Keep)
207 if arvlog.writeCloser != nil {
208 _, err1 = arvlog.writeCloser.Write(p)
211 // write to API after checking rate limit
215 if now.After(arvlog.logThrottleResetTime) {
216 // It has been more than throttle_period seconds since the last
217 // checkpoint; so reset the throttle
218 if arvlog.logThrottleBytesSkipped > 0 {
219 arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
222 arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
223 arvlog.logThrottleBytesSoFar = 0
224 arvlog.logThrottleLinesSoFar = 0
225 arvlog.logThrottleBytesSkipped = 0
226 arvlog.logThrottleIsOpen = true
227 arvlog.logThrottlePartialLineLastAt = time.Time{}
228 arvlog.logThrottleFirstPartialLine = true
231 lines := bytes.Split(p, []byte("\n"))
233 for _, line := range lines {
234 // Short circuit the counting code if we're just going to throw
235 // away the data anyway.
236 if !arvlog.logThrottleIsOpen {
237 arvlog.logThrottleBytesSkipped += int64(len(line))
239 } else if len(line) == 0 {
244 logOpen, msg := arvlog.rateLimit(line, now)
245 arvlog.bufToFlush.WriteString(string(msg) + "\n")
246 arvlog.logThrottleIsOpen = logOpen
249 if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
250 (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
252 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
253 "object_uuid": arvlog.UUID,
254 "event_type": arvlog.loggingStream,
255 "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
256 err2 := arvlog.ArvClient.Create("logs", lr, nil)
258 bytesWritten = arvlog.bufToFlush.Len()
259 arvlog.bufToFlush = bytes.Buffer{}
260 arvlog.bufFlushedAt = now
262 if err1 != nil || err2 != nil {
263 return 0, fmt.Errorf("%s ; %s", err1, err2)
267 return bytesWritten, nil
270 // Close the underlying writer
271 func (arvlog *ArvLogWriter) Close() (err error) {
272 if arvlog.writeCloser != nil {
273 err = arvlog.writeCloser.Close()
274 arvlog.writeCloser = nil
279 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
281 // Test for hard cap on total output and for log throttling. Returns whether
282 // the log line should go to output or not. Returns message if limit exceeded.
283 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
285 lineSize := int64(len(line))
289 if arvlog.logThrottleIsOpen {
290 matches := lineRegexp.FindStringSubmatch(string(line))
292 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
295 if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
296 arvlog.logThrottlePartialLineLastAt = now
303 arvlog.logThrottleLinesSoFar += 1
304 arvlog.logThrottleBytesSoFar += lineSize
305 arvlog.bytesLogged += lineSize
308 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
309 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
310 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
311 arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
312 arvlog.logThrottleIsOpen = false
314 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
315 remainingTime := arvlog.logThrottleResetTime.Sub(now)
316 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
317 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
318 arvlog.logThrottleIsOpen = false
320 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
321 remainingTime := arvlog.logThrottleResetTime.Sub(now)
322 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
323 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
324 arvlog.logThrottleIsOpen = false
326 } else if partialLine && arvlog.logThrottleFirstPartialLine {
327 arvlog.logThrottleFirstPartialLine = false
328 message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
329 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
334 if !arvlog.logThrottleIsOpen {
335 // Don't log anything if any limit has been exceeded. Just count lossage.
336 arvlog.logThrottleBytesSkipped += lineSize
340 // Yes, write to logs, but use our "rate exceeded" message
341 // instead of the log message that exceeded the limit.
342 message += " A complete log is still being written to Keep, and will be available when the job finishes."
343 return true, []byte(message)
344 } else if partialLine {
347 return arvlog.logThrottleIsOpen, line
351 // load the rate limit discovery config paramters
352 func loadLogThrottleParams(clnt IArvadosClient) {
353 param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
355 crunchLimitLogBytesPerJob = 67108864
357 crunchLimitLogBytesPerJob = int64(param.(float64))
360 param, err = clnt.Discovery("crunchLogThrottleBytes")
362 crunchLogThrottleBytes = 65536
364 crunchLogThrottleBytes = int64(param.(float64))
367 param, err = clnt.Discovery("crunchLogThrottlePeriod")
369 crunchLogThrottlePeriod = 60
371 crunchLogThrottlePeriod = int(param.(float64))
374 param, err = clnt.Discovery("crunchLogThrottleLines")
376 crunchLogThrottleLines = 1024
378 crunchLogThrottleLines = int64(param.(float64))
381 param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
383 crunchLogPartialLineThrottlePeriod = 5
385 crunchLogPartialLineThrottlePeriod = int(param.(float64))
388 param, err = clnt.Discovery("crunchLogBytesPerEvent")
390 crunchLogBytesPerEvent = 4096
392 crunchLogBytesPerEvent = int64(param.(float64))
395 param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
397 crunchLogSecondsBetweenEvents = 1
399 crunchLogSecondsBetweenEvents = int(param.(float64))