14 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer. It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
45 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
46 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
48 // RFC3339Timestamp formats t as RFC3339NanoFixed.
49 func RFC3339Timestamp(t time.Time) string {
50 return t.Format(RFC3339NanoFixed)
53 // Write prepends a timestamp to each line of the input data and
54 // appends to the internal buffer. Each line is also logged to
55 // tl.Immediate, if tl.Immediate is not nil.
56 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
58 defer tl.Mutex.Unlock()
61 tl.buf = &bytes.Buffer{}
64 //if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush {
65 // tl.pendingFlush = true
66 // tl.flush <- struct{}{}
69 now := tl.Timestamper(time.Now().UTC())
70 sc := bufio.NewScanner(bytes.NewBuffer(p))
71 for err == nil && sc.Scan() {
72 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
73 if tl.Immediate != nil {
74 tl.Immediate.Print(out[:len(out)-1])
76 _, err = io.WriteString(tl.buf, out)
87 // Periodically check the current buffer; if not empty, send it on the
88 // channel to the goWriter goroutine.
89 func (tl *ThrottledLogger) flusher() {
90 ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
92 for stopping := false; !stopping; {
94 case _, open := <-tl.flush:
95 // if !open, flush tl.buf and exit the loop
100 var ready *bytes.Buffer
103 ready, tl.buf = tl.buf, &bytes.Buffer{}
104 tl.pendingFlush = false
107 if ready != nil && ready.Len() > 0 {
108 tl.writer.Write(ready.Bytes())
114 // Close the flusher goroutine and wait for it to complete, then close the
115 // underlying Writer.
116 func (tl *ThrottledLogger) Close() error {
124 return tl.writer.Close()
128 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
132 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
134 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
135 reader := bufio.NewReaderSize(in, MaxLogLine)
138 line, isPrefix, err := reader.ReadLine()
141 } else if err != nil {
142 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
149 if prefix == "" && suffix == "" {
152 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
155 // Set up prefix for following line
165 // NewThrottledLogger creates a new thottled logger that
166 // (a) prepends timestamps to each line
167 // (b) batches log messages and only calls the underlying Writer
168 // at most once per "crunchLogSecondsBetweenEvents" seconds.
169 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
170 tl := &ThrottledLogger{}
171 tl.flush = make(chan struct{}, 1)
172 tl.stopped = make(chan struct{})
174 tl.Logger = log.New(tl, "", 0)
175 tl.Timestamper = RFC3339Timestamp
180 // Log throttling rate limiting config parameters
181 var crunchLimitLogBytesPerJob int64
182 var crunchLogThrottleBytes int64
183 var crunchLogThrottlePeriod int
184 var crunchLogThrottleLines int64
185 var crunchLogPartialLineThrottlePeriod int
186 var crunchLogBytesPerEvent int64
187 var crunchLogSecondsBetweenEvents int
189 // ArvLogWriter is an io.WriteCloser that processes each write by
190 // writing it through to another io.WriteCloser (typically a
191 // CollectionFileWriter) and creating an Arvados log entry.
192 type ArvLogWriter struct {
193 ArvClient IArvadosClient
196 writeCloser io.WriteCloser
200 logThrottleResetTime time.Time
201 logThrottleLinesSoFar int64
202 logThrottleBytesSoFar int64
203 logThrottleBytesSkipped int64
204 logThrottleIsOpen bool
205 logThrottlePartialLineLastAt time.Time
206 logThrottleFirstPartialLine bool
207 bufToFlush bytes.Buffer
208 bufFlushedAt time.Time
211 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
212 // Write to the next writer in the chain (a file in Keep)
214 if arvlog.writeCloser != nil {
215 _, err1 = arvlog.writeCloser.Write(p)
218 // write to API after checking rate limit
222 if now.After(arvlog.logThrottleResetTime) {
223 // It has been more than throttle_period seconds since the last
224 // checkpoint; so reset the throttle
225 if arvlog.logThrottleBytesSkipped > 0 {
226 arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
229 arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
230 arvlog.logThrottleBytesSoFar = 0
231 arvlog.logThrottleLinesSoFar = 0
232 arvlog.logThrottleBytesSkipped = 0
233 arvlog.logThrottleIsOpen = true
234 arvlog.logThrottlePartialLineLastAt = time.Time{}
235 arvlog.logThrottleFirstPartialLine = true
238 lines := bytes.Split(p, []byte("\n"))
240 for _, line := range lines {
241 // Short circuit the counting code if we're just going to throw
242 // away the data anyway.
243 if !arvlog.logThrottleIsOpen {
244 arvlog.logThrottleBytesSkipped += int64(len(line))
246 } else if len(line) == 0 {
251 logOpen, msg := arvlog.rateLimit(line, now)
253 arvlog.bufToFlush.WriteString(string(msg) + "\n")
257 if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
258 (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
260 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
261 "object_uuid": arvlog.UUID,
262 "event_type": arvlog.loggingStream,
263 "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
264 err2 := arvlog.ArvClient.Create("logs", lr, nil)
266 bytesWritten = arvlog.bufToFlush.Len()
267 arvlog.bufToFlush = bytes.Buffer{}
268 arvlog.bufFlushedAt = now
270 if err1 != nil || err2 != nil {
271 return 0, fmt.Errorf("%s ; %s", err1, err2)
275 return bytesWritten, nil
278 // Close the underlying writer
279 func (arvlog *ArvLogWriter) Close() (err error) {
280 if arvlog.writeCloser != nil {
281 err = arvlog.writeCloser.Close()
282 arvlog.writeCloser = nil
287 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
289 // Test for hard cap on total output and for log throttling. Returns whether
290 // the log line should go to output or not. Returns message if limit exceeded.
291 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
293 lineSize := int64(len(line))
296 if arvlog.logThrottleIsOpen {
297 matches := lineRegexp.FindStringSubmatch(string(line))
299 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
301 if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
302 arvlog.logThrottlePartialLineLastAt = now
303 arvlog.logThrottleFirstPartialLine = true
307 arvlog.logThrottleLinesSoFar += 1
308 arvlog.logThrottleBytesSoFar += lineSize
309 arvlog.bytesLogged += lineSize
311 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
312 message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
313 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
314 arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
315 arvlog.logThrottleIsOpen = false
317 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
318 remainingTime := arvlog.logThrottleResetTime.Sub(now)
319 message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
320 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
321 arvlog.logThrottleIsOpen = false
323 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
324 remainingTime := arvlog.logThrottleResetTime.Sub(now)
325 message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
326 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
327 arvlog.logThrottleIsOpen = false
329 } else if partialLine && arvlog.logThrottleFirstPartialLine {
330 arvlog.logThrottleFirstPartialLine = false
331 message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
332 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
337 if !arvlog.logThrottleIsOpen {
338 // Don't log anything if any limit has been exceeded. Just count lossage.
339 arvlog.logThrottleBytesSkipped += lineSize
343 // Yes, write to logs, but use our "rate exceeded" message
344 // instead of the log message that exceeded the limit.
345 message += " A complete log is still being written to Keep, and will be available when the job finishes."
346 return true, []byte(message)
348 return arvlog.logThrottleIsOpen, line
352 // load the rate limit discovery config paramters
353 func loadLogThrottleParams(clnt IArvadosClient) {
354 param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
356 crunchLimitLogBytesPerJob = 67108864
358 crunchLimitLogBytesPerJob = int64(param.(float64))
361 param, err = clnt.Discovery("crunchLogThrottleBytes")
363 crunchLogThrottleBytes = 65536
365 crunchLogThrottleBytes = int64(param.(float64))
368 param, err = clnt.Discovery("crunchLogThrottlePeriod")
370 crunchLogThrottlePeriod = 60
372 crunchLogThrottlePeriod = int(param.(float64))
375 param, err = clnt.Discovery("crunchLogThrottleLines")
377 crunchLogThrottleLines = 1024
379 crunchLogThrottleLines = int64(param.(float64))
382 param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
384 crunchLogPartialLineThrottlePeriod = 5
386 crunchLogPartialLineThrottlePeriod = int(param.(float64))
389 param, err = clnt.Discovery("crunchLogBytesPerEvent")
391 crunchLogBytesPerEvent = 4096
393 crunchLogBytesPerEvent = int64(param.(float64))
396 param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
398 crunchLogSecondsBetweenEvents = 1
400 crunchLogSecondsBetweenEvents = int(param.(float64))