From bda9093be4d24d45a6fff29148fbb5438e283897 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 8 Oct 2021 11:04:53 -0400 Subject: [PATCH] 16347: Add LocalKeepLogsToContainerLog config (none/errors/all). Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/config/config.default.yml | 15 ++++++ lib/config/export.go | 1 + lib/config/generated_config.go | 15 ++++++ lib/config/load.go | 10 ++++ lib/crunchrun/crunchrun.go | 23 +++++++-- lib/crunchrun/integration_test.go | 77 +++++++++++++++++++------------ lib/crunchrun/logging.go | 51 ++++++++++++++++++++ lib/crunchrun/logging_test.go | 34 ++++++++++++++ sdk/go/arvados/config.go | 1 + 9 files changed, 194 insertions(+), 33 deletions(-) diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 429b9fe48b..106ecdfac2 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -929,6 +929,21 @@ Clusters: # in memory for the duration of the container. LocalKeepBlobBuffersPerVCPU: 0 + # When running a dedicated keepstore process for a container + # (see LocalKeepBlobBuffersPerVCPU), write keepstore log + # messages to keepstore.txt in the container's log collection. + # + # These log messages can reveal some volume configuration + # details, error messages from the cloud storage provider, etc., + # which are not otherwise visible to users. + # + # Accepted values: + # * "none" -- no keepstore.txt file + # * "all" -- all logs, including request and response lines + # * "errors" -- all logs except "response" logs with 2xx + # response codes and "request" logs + LocalKeepLogsToContainerLog: none + Logging: # When you run the db:delete_old_container_logs task, it will find # containers that have been finished for at least this many seconds, diff --git a/lib/config/export.go b/lib/config/export.go index f400dabbff..e36d6e76ca 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -120,6 +120,7 @@ var whitelist = map[string]bool{ "Containers.JobsAPI.Enable": true, "Containers.JobsAPI.GitInternalDir": false, "Containers.LocalKeepBlobBuffersPerVCPU": false, + "Containers.LocalKeepLogsToContainerLog": false, "Containers.Logging": false, "Containers.LogReuseDecisions": false, "Containers.LSF": false, diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index dfa406bbe6..4207e6e4d9 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -935,6 +935,21 @@ Clusters: # in memory for the duration of the container. LocalKeepBlobBuffersPerVCPU: 0 + # When running a dedicated keepstore process for a container + # (see LocalKeepBlobBuffersPerVCPU), write keepstore log + # messages to keepstore.txt in the container's log collection. + # + # These log messages can reveal some volume configuration + # details, error messages from the cloud storage provider, etc., + # which are not otherwise visible to users. + # + # Accepted values: + # * "none" -- no keepstore.txt file + # * "all" -- all logs, including request and response lines + # * "errors" -- all logs except "response" logs with 2xx + # response codes and "request" logs + LocalKeepLogsToContainerLog: none + Logging: # When you run the db:delete_old_container_logs task, it will find # containers that have been finished for at least this many seconds, diff --git a/lib/config/load.go b/lib/config/load.go index b6375c820f..a7419331f0 100644 --- a/lib/config/load.go +++ b/lib/config/load.go @@ -295,6 +295,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) { ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken), ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey), checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection), + ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"), ldr.checkEmptyKeepstores(cc), ldr.checkUnlistedKeepstores(cc), ldr.checkLocalKeepstoreVolumes(cc), @@ -339,6 +340,15 @@ func (ldr *Loader) checkToken(label, token string) error { return nil } +func (ldr *Loader) checkEnum(label, value string, accepted ...string) error { + for _, s := range accepted { + if s == value { + return nil + } + } + return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted) +} + func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error { cluster: for id, cc := range cfg.Clusters { diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index ce060151dc..c9456ccc74 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -1775,14 +1775,29 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } - if keepstore != nil { - w, err := cr.NewLogWriter("keepstore") + if keepstore == nil { + // Nothing is written to keepstoreLogbuf, no need to + // call SetWriter. + } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" { + keepstoreLogbuf.SetWriter(io.Discard) + } else { + logwriter, err := cr.NewLogWriter("keepstore") if err != nil { log.Print(err) return 1 } - cr.keepstoreLogger = NewThrottledLogger(w) - err = keepstoreLogbuf.SetWriter(cr.keepstoreLogger) + cr.keepstoreLogger = NewThrottledLogger(logwriter) + + var writer io.WriteCloser = cr.keepstoreLogger + if logWhat == "errors" { + writer = &filterKeepstoreErrorsOnly{WriteCloser: writer} + } else if logWhat != "all" { + // should have been caught earlier by + // dispatcher's config loader + log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat) + return 1 + } + err = keepstoreLogbuf.SetWriter(writer) if err != nil { log.Print(err) return 1 diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go index 597490c578..8adddd7053 100644 --- a/lib/crunchrun/integration_test.go +++ b/lib/crunchrun/integration_test.go @@ -87,9 +87,6 @@ func (s *integrationSuite) SetUpSuite(c *C) { }) c.Assert(err, IsNil) c.Logf("input pdh %s", s.input.PortableDataHash) - - s.logCollection = arvados.Collection{} - s.outputCollection = arvados.Collection{} } func (s *integrationSuite) TearDownSuite(c *C) { @@ -106,6 +103,8 @@ func (s *integrationSuite) SetUpTest(c *C) { s.stdin = bytes.Buffer{} s.stdout = bytes.Buffer{} s.stderr = bytes.Buffer{} + s.logCollection = arvados.Collection{} + s.outputCollection = arvados.Collection{} s.cr = arvados.ContainerRequest{ Priority: 1, State: "Committed", @@ -165,35 +164,55 @@ func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) { } func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) { - cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load() - c.Assert(err, IsNil) - cluster, err := cfg.GetCluster("") - c.Assert(err, IsNil) - for uuid, volume := range cluster.Volumes { - volume.AccessViaHosts = nil - volume.Replication = 2 - cluster.Volumes[uuid] = volume - } + for _, trial := range []struct { + logConfig string + matchGetReq Checker + matchPutReq Checker + matchStartupMessage Checker + }{ + {"none", Not(Matches), Not(Matches), Not(Matches)}, + {"all", Matches, Matches, Matches}, + {"errors", Not(Matches), Not(Matches), Matches}, + } { + c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig) + s.SetUpTest(c) - s.stdin.Reset() - err = json.NewEncoder(&s.stdin).Encode(ConfigData{ - Env: nil, - KeepBuffers: 1, - Cluster: cluster, - }) - c.Assert(err, IsNil) + cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load() + c.Assert(err, IsNil) + cluster, err := cfg.GetCluster("") + c.Assert(err, IsNil) + for uuid, volume := range cluster.Volumes { + volume.AccessViaHosts = nil + volume.Replication = 2 + cluster.Volumes[uuid] = volume + } + cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig - s.engine = "docker" - s.testRunTrivialContainer(c) + s.stdin.Reset() + err = json.NewEncoder(&s.stdin).Encode(ConfigData{ + Env: nil, + KeepBuffers: 1, + Cluster: cluster, + }) + c.Assert(err, IsNil) - fs, err := s.logCollection.FileSystem(s.client, s.kc) - c.Assert(err, IsNil) - f, err := fs.Open("keepstore.txt") - c.Assert(err, IsNil) - buf, err := ioutil.ReadAll(f) - c.Assert(err, IsNil) - c.Check(string(buf), Matches, `(?ms).*"reqMethod":"GET".*`) - c.Check(string(buf), Matches, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`) + s.engine = "docker" + s.testRunTrivialContainer(c) + + fs, err := s.logCollection.FileSystem(s.client, s.kc) + c.Assert(err, IsNil) + f, err := fs.Open("keepstore.txt") + if trial.logConfig == "none" { + c.Check(err, NotNil) + c.Check(os.IsNotExist(err), Equals, true) + } else { + c.Assert(err, IsNil) + buf, err := ioutil.ReadAll(f) + c.Assert(err, IsNil) + c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`) + c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`) + } + } } func (s *integrationSuite) testRunTrivialContainer(c *C) { diff --git a/lib/crunchrun/logging.go b/lib/crunchrun/logging.go index 050894383d..76a55c4992 100644 --- a/lib/crunchrun/logging.go +++ b/lib/crunchrun/logging.go @@ -7,6 +7,7 @@ package crunchrun import ( "bufio" "bytes" + "encoding/json" "fmt" "io" "log" @@ -404,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) { loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod") } + +type filterKeepstoreErrorsOnly struct { + io.WriteCloser + buf []byte +} + +func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) { + log.Printf("filterKeepstoreErrorsOnly: write %q", p) + f.buf = append(f.buf, p...) + start := 0 + for i := len(f.buf) - len(p); i < len(f.buf); i++ { + if f.buf[i] == '\n' { + if f.check(f.buf[start:i]) { + _, err := f.WriteCloser.Write(f.buf[start : i+1]) + if err != nil { + return 0, err + } + } + start = i + 1 + } + } + if start > 0 { + copy(f.buf, f.buf[start:]) + f.buf = f.buf[:len(f.buf)-start] + } + return len(p), nil +} + +func (f *filterKeepstoreErrorsOnly) check(line []byte) bool { + if len(line) == 0 { + return false + } + if line[0] != '{' { + return true + } + var m map[string]interface{} + err := json.Unmarshal(line, &m) + if err != nil { + return true + } + if m["msg"] == "request" { + return false + } + if m["msg"] == "response" { + if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 { + return false + } + } + return true +} diff --git a/lib/crunchrun/logging_test.go b/lib/crunchrun/logging_test.go index 55460af379..fdd4f27b7f 100644 --- a/lib/crunchrun/logging_test.go +++ b/lib/crunchrun/logging_test.go @@ -5,7 +5,9 @@ package crunchrun import ( + "bytes" "fmt" + "io" "strings" "testing" "time" @@ -13,6 +15,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" . "gopkg.in/check.v1" + check "gopkg.in/check.v1" ) type LoggingTestSuite struct { @@ -219,3 +222,34 @@ func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string c.Check(true, Equals, strings.Contains(stderrLog, expected)) c.Check(string(kc.Content), Equals, logtext) } + +type filterSuite struct{} + +var _ = Suite(&filterSuite{}) + +func (*filterSuite) TestFilterKeepstoreErrorsOnly(c *check.C) { + var buf bytes.Buffer + f := filterKeepstoreErrorsOnly{WriteCloser: nopCloser{&buf}} + for _, s := range []string{ + "not j", + "son\n" + `{"msg":"foo"}` + "\n{}\n" + `{"msg":"request"}` + "\n" + `{"msg":1234}` + "\n\n", + "\n[\n", + `{"msg":"response","respStatusCode":404,"foo": "bar"}` + "\n", + `{"msg":"response","respStatusCode":206}` + "\n", + } { + f.Write([]byte(s)) + } + c.Check(buf.String(), check.Equals, `not json +{"msg":"foo"} +{} +{"msg":1234} +[ +{"msg":"response","respStatusCode":404,"foo": "bar"} +`) +} + +type nopCloser struct { + io.Writer +} + +func (nopCloser) Close() error { return nil } diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index b84e1eefaa..e736f79fd7 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -436,6 +436,7 @@ type ContainersConfig struct { UsePreemptibleInstances bool RuntimeEngine string LocalKeepBlobBuffersPerVCPU int + LocalKeepLogsToContainerLog string JobsAPI struct { Enable string -- 2.30.2