// Get and save the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() (err error) {
- w := &ArvLogWriter{
- ArvClient: runner.ArvClient,
- UUID: runner.Container.UUID,
- loggingStream: "container",
- writeCloser: runner.LogCollection.Open("container.json"),
- }
+ w := NewArvLogWriter(runner.ArvClient, runner.Container.UUID, "container",
+ runner.LogCollection.Open("container.json"))
// Get Container record JSON from the API Server
reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
if err != nil {
// point, but re-open crunch log with ArvClient in case there are any
// other further (such as failing to write the log to Keep!) while
// shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
- UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+ runner.CrunchLog = NewThrottledLogger(NewArvLogWriter(runner.ArvClient, runner.Container.UUID, "crunch-run", nil))
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, writeCloser: runner.LogCollection.Open(name + ".txt")}
+ return NewArvLogWriter(runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name+".txt"))
}
// Run the full container lifecycle.
logThrottleFirstPartialLine bool
stderrBufToFlush bytes.Buffer
stderrFlushedAt time.Time
+
+ // rate limiting config parameters
+ crunchLimitLogBytesPerJob int64
+ crunchLogThrottleBytes int64
+ crunchLogThrottlePeriod int
+ crunchLogThrottleLines int64
+ crunchLogPartialLineThrottlePeriod int
+ crunchLogBytesPerEvent int64
+ crunchLogSecondsBetweenEvents int
+}
+
+// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
+func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
+ w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
+
+ // load the rate limit discovery config paramters
+ param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+ if err != nil {
+ w.crunchLimitLogBytesPerJob = 67108864
+ } else {
+ w.crunchLimitLogBytesPerJob = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottleBytes")
+ if err != nil {
+ w.crunchLogThrottleBytes = 65536
+ } else {
+ w.crunchLogThrottleBytes = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottlePeriod")
+ if err != nil {
+ w.crunchLogThrottlePeriod = 60
+ } else {
+ w.crunchLogThrottlePeriod = int(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottleLines")
+ if err != nil {
+ w.crunchLogThrottleLines = 1024
+ } else {
+ w.crunchLogThrottleLines = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+ if err != nil {
+ w.crunchLogPartialLineThrottlePeriod = 5
+ } else {
+ w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogBytesPerEvent")
+ if err != nil {
+ w.crunchLogBytesPerEvent = 4096
+ } else {
+ w.crunchLogBytesPerEvent = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+ if err != nil {
+ w.crunchLogSecondsBetweenEvents = 1
+ } else {
+ w.crunchLogSecondsBetweenEvents = int(param.(float64))
+ }
+
+ return w
}
func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
}
// write to API after checking rate limit
- crunchLogThrottlePeriod, err2 := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
- crunchLogBytesPerEvent, err2 := arvlog.ArvClient.Discovery("crunchLogBytesPerEvent")
- crunchLogSecondsBetweenEvents, err2 := arvlog.ArvClient.Discovery("crunchLogSecondsBetweenEvents")
- if err2 != nil {
- return 0, fmt.Errorf("%s ; %s", err1, err2)
- }
-
now := time.Now()
bytesWritten := 0
arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now().UTC()), arvlog.logThrottleBytesSkipped))
}
- arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(int(crunchLogThrottlePeriod.(float64))))
+ arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
arvlog.logThrottleBytesSoFar = 0
arvlog.logThrottleLinesSoFar = 0
arvlog.logThrottleBytesSkipped = 0
arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
}
- if arvlog.stderrBufToFlush.Len() > int(crunchLogBytesPerEvent.(float64)) ||
- (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(int64(crunchLogSecondsBetweenEvents.(float64)))) {
+ if int64(arvlog.stderrBufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
+ (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
// write to API
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
"object_uuid": arvlog.UUID,
skipCounts := false
if arvlog.logThrottleIsOpen {
- crunchLogPartialLineThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogPartialLineThrottlePeriod")
- crunchLimitLogBytesPerJob, err := arvlog.ArvClient.Discovery("crunchLimitLogBytesPerJob")
- crunchLogThrottleBytes, err := arvlog.ArvClient.Discovery("crunchLogThrottleBytes")
- crunchLogThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
- crunchLogThrottleLines, err := arvlog.ArvClient.Discovery("crunchLogThrottleLines")
- if err != nil {
- return false, []byte(""), err
- }
-
matches := lineRegexp.FindStringSubmatch(string(line))
if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
partialLine = true
- if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(int(crunchLogPartialLineThrottlePeriod.(float64))))) {
+ if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
arvlog.logThrottlePartialLineLastAt = time.Now()
} else {
skipCounts = true
arvlog.bytesLogged += lineSize
}
- if arvlog.bytesLogged > int64(crunchLimitLogBytesPerJob.(float64)) {
- message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), int(crunchLimitLogBytesPerJob.(float64)))
+ if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
+ message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLimitLogBytesPerJob)
arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
arvlog.logThrottleIsOpen = false
- } else if arvlog.logThrottleBytesSoFar > int64(crunchLogThrottleBytes.(float64)) {
+ } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
- message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogThrottleBytes.(float64)), int(crunchLogThrottlePeriod.(float64)), remainingTime/time.Second)
+ message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
- } else if arvlog.logThrottleLinesSoFar > int64(crunchLogThrottleLines.(float64)) {
+ } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
- message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogThrottleLines.(float64)), int(crunchLogThrottlePeriod.(float64)), remainingTime/time.Second)
+ message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
} else if partialLine && arvlog.logThrottleFirstPartialLine {
arvlog.logThrottleFirstPartialLine = false
- message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogPartialLineThrottlePeriod.(float64)))
+ message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
}
}