1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 // rfc3339NanoFixed is a fixed-width version of time.RFC3339Nano.
16 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
18 // prefixer wraps an io.Writer, inserting a string returned by
19 // prefixFunc at the beginning of each line.
20 type prefixer struct {
22 prefixFunc func() string
23 unfinished bool // true if the most recent write ended with a non-newline char
26 // newTimestamper wraps an io.Writer, inserting an RFC3339NanoFixed
27 // timestamp at the beginning of each line.
28 func newTimestamper(w io.Writer) *prefixer {
31 prefixFunc: func() string { return time.Now().UTC().Format(rfc3339NanoFixed + " ") },
35 // newStringPrefixer wraps an io.Writer, inserting the given string at
36 // the beginning of each line. The given string should include a
37 // trailing space for readability.
38 func newStringPrefixer(w io.Writer, s string) *prefixer {
41 prefixFunc: func() string { return s },
45 func (tp *prefixer) Write(p []byte) (n int, err error) {
46 for len(p) > 0 && err == nil {
48 _, err = io.WriteString(tp.writer, tp.prefixFunc())
53 newline := bytes.IndexRune(p, '\n')
57 nn, err = tp.writer.Write(p)
61 nn, err = tp.writer.Write(p[:newline+1])
69 // logWriter adds log.Logger methods to an io.Writer.
70 type logWriter struct {
75 func newLogWriter(w io.Writer) *logWriter {
78 Logger: log.New(w, "", 0),
82 var crunchLogUpdatePeriod = time.Hour / 2
83 var crunchLogUpdateSize = int64(1 << 25)
85 // load the rate limit discovery config parameters
86 func loadLogThrottleParams(clnt IArvadosClient) {
87 loadDuration := func(dst *time.Duration, key string) {
88 if param, err := clnt.Discovery(key); err != nil {
90 } else if d, ok := param.(float64); !ok {
93 *dst = time.Duration(d) * time.Second
96 loadInt64 := func(dst *int64, key string) {
97 if param, err := clnt.Discovery(key); err != nil {
99 } else if val, ok := param.(float64); !ok {
106 loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
107 loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
111 type filterKeepstoreErrorsOnly struct {
116 func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
117 log.Printf("filterKeepstoreErrorsOnly: write %q", p)
118 f.buf = append(f.buf, p...)
120 for i := len(f.buf) - len(p); i < len(f.buf); i++ {
121 if f.buf[i] == '\n' {
122 if f.check(f.buf[start:i]) {
123 _, err := f.WriteCloser.Write(f.buf[start : i+1])
132 copy(f.buf, f.buf[start:])
133 f.buf = f.buf[:len(f.buf)-start]
138 func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
145 var m map[string]interface{}
146 err := json.Unmarshal(line, &m)
150 if m["msg"] == "request" {
153 if m["msg"] == "response" {
154 if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {