flush chan struct{}
stopped chan struct{}
Timestamper
- Immediate *log.Logger
+ Immediate *log.Logger
+ pendingFlush bool
}
// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
tl.buf = &bytes.Buffer{}
}
+ //if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush {
+ // tl.pendingFlush = true
+ // tl.flush <- struct{}{}
+ //}
+
now := tl.Timestamper(time.Now().UTC())
sc := bufio.NewScanner(bytes.NewBuffer(p))
for err == nil && sc.Scan() {
var ready *bytes.Buffer
tl.Mutex.Lock()
- ready, tl.buf = tl.buf, nil
+ ready, tl.buf = tl.buf, &bytes.Buffer{}
+ tl.pendingFlush = false
tl.Mutex.Unlock()
if ready != nil && ready.Len() > 0 {
// NewThrottledLogger creates a new thottled logger that
// (a) prepends timestamps to each line
-// (b) batches log messages and only calls the underlying Writer at most once
-// per second.
+// (b) batches log messages and only calls the underlying Writer
+// at most once per "crunchLogSecondsBetweenEvents" seconds.
func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
tl := &ThrottledLogger{}
tl.flush = make(chan struct{}, 1)
// check rateLimit
logOpen, msg := arvlog.rateLimit(line, now)
- arvlog.bufToFlush.WriteString(string(msg) + "\n")
- arvlog.logThrottleIsOpen = logOpen
+ if logOpen {
+ arvlog.bufToFlush.WriteString(string(msg) + "\n")
+ }
}
if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
message := ""
lineSize := int64(len(line))
partialLine := false
- skipCounts := false
if arvlog.logThrottleIsOpen {
matches := lineRegexp.FindStringSubmatch(string(line))
if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
partialLine = true
-
if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
arvlog.logThrottlePartialLineLastAt = now
- } else {
- skipCounts = true
+ arvlog.logThrottleFirstPartialLine = true
}
}
- if !skipCounts {
- arvlog.logThrottleLinesSoFar += 1
- arvlog.logThrottleBytesSoFar += lineSize
- arvlog.bytesLogged += lineSize
- }
+ arvlog.logThrottleLinesSoFar += 1
+ arvlog.logThrottleBytesSoFar += lineSize
+ arvlog.bytesLogged += lineSize
if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
// instead of the log message that exceeded the limit.
message += " A complete log is still being written to Keep, and will be available when the job finishes."
return true, []byte(message)
- } else if partialLine {
- return false, line
} else {
return arvlog.logThrottleIsOpen, line
}