*log.Logger
buf *bytes.Buffer
sync.Mutex
- writer io.WriteCloser
- stopping chan struct{}
- stopped chan struct{}
+ writer io.WriteCloser
+ flush chan struct{}
+ stopped chan struct{}
Timestamper
Immediate *log.Logger
}
// Periodically check the current buffer; if not empty, send it on the
// channel to the goWriter goroutine.
func (tl *ThrottledLogger) flusher() {
- ticker := time.NewTicker(time.Second)
+ ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
defer ticker.Stop()
for stopping := false; !stopping; {
select {
- case <-tl.stopping:
- // flush tl.buf, then exit the loop
- stopping = true
+ case _, open := <-tl.flush:
+ // if !open, flush tl.buf and exit the loop
+ stopping = !open
case <-ticker.C:
}
// underlying Writer.
func (tl *ThrottledLogger) Close() error {
select {
- case <-tl.stopping:
+ case <-tl.flush:
// already stopped
default:
- close(tl.stopping)
+ close(tl.flush)
}
<-tl.stopped
return tl.writer.Close()
// per second.
func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
tl := &ThrottledLogger{}
- tl.stopping = make(chan struct{})
+ tl.flush = make(chan struct{}, 1)
tl.stopped = make(chan struct{})
tl.writer = writer
tl.Logger = log.New(tl, "", 0)
return tl
}
+// Log throttling rate limiting config parameters
+var crunchLimitLogBytesPerJob int64
+var crunchLogThrottleBytes int64
+var crunchLogThrottlePeriod int
+var crunchLogThrottleLines int64
+var crunchLogPartialLineThrottlePeriod int
+var crunchLogBytesPerEvent int64
+var crunchLogSecondsBetweenEvents int
+
// ArvLogWriter is an io.WriteCloser that processes each write by
// writing it through to another io.WriteCloser (typically a
// CollectionFileWriter) and creating an Arvados log entry.
logThrottleFirstPartialLine bool
bufToFlush bytes.Buffer
bufFlushedAt time.Time
-
- // rate limiting config parameters
- crunchLimitLogBytesPerJob int64
- crunchLogThrottleBytes int64
- crunchLogThrottlePeriod int
- crunchLogThrottleLines int64
- crunchLogPartialLineThrottlePeriod int
- crunchLogBytesPerEvent int64
- crunchLogSecondsBetweenEvents int
-}
-
-// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
-func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
- w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
-
- // load the rate limit discovery config paramters
- param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
- if err != nil {
- w.crunchLimitLogBytesPerJob = 67108864
- } else {
- w.crunchLimitLogBytesPerJob = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleBytes")
- if err != nil {
- w.crunchLogThrottleBytes = 65536
- } else {
- w.crunchLogThrottleBytes = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottlePeriod")
- if err != nil {
- w.crunchLogThrottlePeriod = 60
- } else {
- w.crunchLogThrottlePeriod = int(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleLines")
- if err != nil {
- w.crunchLogThrottleLines = 1024
- } else {
- w.crunchLogThrottleLines = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
- if err != nil {
- w.crunchLogPartialLineThrottlePeriod = 5
- } else {
- w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogBytesPerEvent")
- if err != nil {
- w.crunchLogBytesPerEvent = 4096
- } else {
- w.crunchLogBytesPerEvent = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
- if err != nil {
- w.crunchLogSecondsBetweenEvents = 1
- } else {
- w.crunchLogSecondsBetweenEvents = int(param.(float64))
- }
-
- return w
}
func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
}
- arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
+ arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
arvlog.logThrottleBytesSoFar = 0
arvlog.logThrottleLinesSoFar = 0
arvlog.logThrottleBytesSkipped = 0
arvlog.logThrottleIsOpen = logOpen
}
- if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
- (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
+ if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+ (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
// write to API
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
"object_uuid": arvlog.UUID,
if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
partialLine = true
- if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
+ if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
arvlog.logThrottlePartialLineLastAt = now
} else {
skipCounts = true
arvlog.bytesLogged += lineSize
}
- if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
- message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
+ if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
+ message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
+ RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
arvlog.logThrottleIsOpen = false
- } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
+ } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
remainingTime := arvlog.logThrottleResetTime.Sub(now)
- message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+ message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
- } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
+ } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
remainingTime := arvlog.logThrottleResetTime.Sub(now)
- message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+ message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
} else if partialLine && arvlog.logThrottleFirstPartialLine {
arvlog.logThrottleFirstPartialLine = false
- message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
+ message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
}
}
return arvlog.logThrottleIsOpen, line
}
}
+
+// load the rate limit discovery config paramters
+func loadLogThrottleParams(clnt IArvadosClient) {
+ param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+ if err != nil {
+ crunchLimitLogBytesPerJob = 67108864
+ } else {
+ crunchLimitLogBytesPerJob = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottleBytes")
+ if err != nil {
+ crunchLogThrottleBytes = 65536
+ } else {
+ crunchLogThrottleBytes = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottlePeriod")
+ if err != nil {
+ crunchLogThrottlePeriod = 60
+ } else {
+ crunchLogThrottlePeriod = int(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottleLines")
+ if err != nil {
+ crunchLogThrottleLines = 1024
+ } else {
+ crunchLogThrottleLines = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+ if err != nil {
+ crunchLogPartialLineThrottlePeriod = 5
+ } else {
+ crunchLogPartialLineThrottlePeriod = int(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogBytesPerEvent")
+ if err != nil {
+ crunchLogBytesPerEvent = 4096
+ } else {
+ crunchLogBytesPerEvent = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+ if err != nil {
+ crunchLogSecondsBetweenEvents = 1
+ } else {
+ crunchLogSecondsBetweenEvents = int(param.(float64))
+ }
+}