16347: Add LocalKeepLogsToContainerLog config (none/errors/all).
authorTom Clegg <tom@curii.com>
Fri, 8 Oct 2021 15:04:53 +0000 (11:04 -0400)
committerTom Clegg <tom@curii.com>
Fri, 8 Oct 2021 15:21:03 +0000 (11:21 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/config/load.go
lib/crunchrun/crunchrun.go
lib/crunchrun/integration_test.go
lib/crunchrun/logging.go
lib/crunchrun/logging_test.go
sdk/go/arvados/config.go

index 429b9fe48bb02cd4447e24c5aa2f799cc4cc30fe..106ecdfac216d5aad9a7e9ca8667f7ae5f0513c5 100644 (file)
@@ -929,6 +929,21 @@ Clusters:
       # in memory for the duration of the container.
       LocalKeepBlobBuffersPerVCPU: 0
 
+      # When running a dedicated keepstore process for a container
+      # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+      # messages to keepstore.txt in the container's log collection.
+      #
+      # These log messages can reveal some volume configuration
+      # details, error messages from the cloud storage provider, etc.,
+      # which are not otherwise visible to users.
+      #
+      # Accepted values:
+      # * "none" -- no keepstore.txt file
+      # * "all" -- all logs, including request and response lines
+      # * "errors" -- all logs except "response" logs with 2xx
+      #   response codes and "request" logs
+      LocalKeepLogsToContainerLog: none
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
index f400dabbff8c8768739f2e4fae262c21e0639dd4..e36d6e76cae40d17c54217e643bf82341e298b87 100644 (file)
@@ -120,6 +120,7 @@ var whitelist = map[string]bool{
        "Containers.JobsAPI.Enable":                           true,
        "Containers.JobsAPI.GitInternalDir":                   false,
        "Containers.LocalKeepBlobBuffersPerVCPU":              false,
+       "Containers.LocalKeepLogsToContainerLog":              false,
        "Containers.Logging":                                  false,
        "Containers.LogReuseDecisions":                        false,
        "Containers.LSF":                                      false,
index dfa406bbe63a4887ca10ac40e6d24165257176d3..4207e6e4d975277cb6a4a37d1d468876c767104b 100644 (file)
@@ -935,6 +935,21 @@ Clusters:
       # in memory for the duration of the container.
       LocalKeepBlobBuffersPerVCPU: 0
 
+      # When running a dedicated keepstore process for a container
+      # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+      # messages to keepstore.txt in the container's log collection.
+      #
+      # These log messages can reveal some volume configuration
+      # details, error messages from the cloud storage provider, etc.,
+      # which are not otherwise visible to users.
+      #
+      # Accepted values:
+      # * "none" -- no keepstore.txt file
+      # * "all" -- all logs, including request and response lines
+      # * "errors" -- all logs except "response" logs with 2xx
+      #   response codes and "request" logs
+      LocalKeepLogsToContainerLog: none
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
index b6375c820fa110a1b85fe009261a3cae6e5127c9..a7419331f0788df0c41dbe9031db17ad73562ce3 100644 (file)
@@ -295,6 +295,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
                        ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken),
                        ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey),
                        checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
+                       ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
                        ldr.checkEmptyKeepstores(cc),
                        ldr.checkUnlistedKeepstores(cc),
                        ldr.checkLocalKeepstoreVolumes(cc),
@@ -339,6 +340,15 @@ func (ldr *Loader) checkToken(label, token string) error {
        return nil
 }
 
+func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
+       for _, s := range accepted {
+               if s == value {
+                       return nil
+               }
+       }
+       return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
+}
+
 func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
 cluster:
        for id, cc := range cfg.Clusters {
index ce060151dc133f232ea9bac237bc59de49612e21..c9456ccc743ba2989901e5e561865f5ee81539d1 100644 (file)
@@ -1775,14 +1775,29 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
-       if keepstore != nil {
-               w, err := cr.NewLogWriter("keepstore")
+       if keepstore == nil {
+               // Nothing is written to keepstoreLogbuf, no need to
+               // call SetWriter.
+       } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+               keepstoreLogbuf.SetWriter(io.Discard)
+       } else {
+               logwriter, err := cr.NewLogWriter("keepstore")
                if err != nil {
                        log.Print(err)
                        return 1
                }
-               cr.keepstoreLogger = NewThrottledLogger(w)
-               err = keepstoreLogbuf.SetWriter(cr.keepstoreLogger)
+               cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+               var writer io.WriteCloser = cr.keepstoreLogger
+               if logWhat == "errors" {
+                       writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+               } else if logWhat != "all" {
+                       // should have been caught earlier by
+                       // dispatcher's config loader
+                       log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+                       return 1
+               }
+               err = keepstoreLogbuf.SetWriter(writer)
                if err != nil {
                        log.Print(err)
                        return 1
index 597490c5788d6a3ddf0e351023d81f8413baef62..8adddd7053921ee25a18a8eead02af6d7c464c77 100644 (file)
@@ -87,9 +87,6 @@ func (s *integrationSuite) SetUpSuite(c *C) {
        })
        c.Assert(err, IsNil)
        c.Logf("input pdh %s", s.input.PortableDataHash)
-
-       s.logCollection = arvados.Collection{}
-       s.outputCollection = arvados.Collection{}
 }
 
 func (s *integrationSuite) TearDownSuite(c *C) {
@@ -106,6 +103,8 @@ func (s *integrationSuite) SetUpTest(c *C) {
        s.stdin = bytes.Buffer{}
        s.stdout = bytes.Buffer{}
        s.stderr = bytes.Buffer{}
+       s.logCollection = arvados.Collection{}
+       s.outputCollection = arvados.Collection{}
        s.cr = arvados.ContainerRequest{
                Priority:       1,
                State:          "Committed",
@@ -165,35 +164,55 @@ func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
 }
 
 func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
-       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
-       c.Assert(err, IsNil)
-       cluster, err := cfg.GetCluster("")
-       c.Assert(err, IsNil)
-       for uuid, volume := range cluster.Volumes {
-               volume.AccessViaHosts = nil
-               volume.Replication = 2
-               cluster.Volumes[uuid] = volume
-       }
+       for _, trial := range []struct {
+               logConfig           string
+               matchGetReq         Checker
+               matchPutReq         Checker
+               matchStartupMessage Checker
+       }{
+               {"none", Not(Matches), Not(Matches), Not(Matches)},
+               {"all", Matches, Matches, Matches},
+               {"errors", Not(Matches), Not(Matches), Matches},
+       } {
+               c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
+               s.SetUpTest(c)
 
-       s.stdin.Reset()
-       err = json.NewEncoder(&s.stdin).Encode(ConfigData{
-               Env:         nil,
-               KeepBuffers: 1,
-               Cluster:     cluster,
-       })
-       c.Assert(err, IsNil)
+               cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+               c.Assert(err, IsNil)
+               cluster, err := cfg.GetCluster("")
+               c.Assert(err, IsNil)
+               for uuid, volume := range cluster.Volumes {
+                       volume.AccessViaHosts = nil
+                       volume.Replication = 2
+                       cluster.Volumes[uuid] = volume
+               }
+               cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
 
-       s.engine = "docker"
-       s.testRunTrivialContainer(c)
+               s.stdin.Reset()
+               err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+                       Env:         nil,
+                       KeepBuffers: 1,
+                       Cluster:     cluster,
+               })
+               c.Assert(err, IsNil)
 
-       fs, err := s.logCollection.FileSystem(s.client, s.kc)
-       c.Assert(err, IsNil)
-       f, err := fs.Open("keepstore.txt")
-       c.Assert(err, IsNil)
-       buf, err := ioutil.ReadAll(f)
-       c.Assert(err, IsNil)
-       c.Check(string(buf), Matches, `(?ms).*"reqMethod":"GET".*`)
-       c.Check(string(buf), Matches, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+               s.engine = "docker"
+               s.testRunTrivialContainer(c)
+
+               fs, err := s.logCollection.FileSystem(s.client, s.kc)
+               c.Assert(err, IsNil)
+               f, err := fs.Open("keepstore.txt")
+               if trial.logConfig == "none" {
+                       c.Check(err, NotNil)
+                       c.Check(os.IsNotExist(err), Equals, true)
+               } else {
+                       c.Assert(err, IsNil)
+                       buf, err := ioutil.ReadAll(f)
+                       c.Assert(err, IsNil)
+                       c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+                       c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+               }
+       }
 }
 
 func (s *integrationSuite) testRunTrivialContainer(c *C) {
index 050894383d757a1e90e79999ef9fa8f2f08b9f01..76a55c4992bbd933085e83282391b3e3b241fb04 100644 (file)
@@ -7,6 +7,7 @@ package crunchrun
 import (
        "bufio"
        "bytes"
+       "encoding/json"
        "fmt"
        "io"
        "log"
@@ -404,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) {
        loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
 }
+
+type filterKeepstoreErrorsOnly struct {
+       io.WriteCloser
+       buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+       log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+       f.buf = append(f.buf, p...)
+       start := 0
+       for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+               if f.buf[i] == '\n' {
+                       if f.check(f.buf[start:i]) {
+                               _, err := f.WriteCloser.Write(f.buf[start : i+1])
+                               if err != nil {
+                                       return 0, err
+                               }
+                       }
+                       start = i + 1
+               }
+       }
+       if start > 0 {
+               copy(f.buf, f.buf[start:])
+               f.buf = f.buf[:len(f.buf)-start]
+       }
+       return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+       if len(line) == 0 {
+               return false
+       }
+       if line[0] != '{' {
+               return true
+       }
+       var m map[string]interface{}
+       err := json.Unmarshal(line, &m)
+       if err != nil {
+               return true
+       }
+       if m["msg"] == "request" {
+               return false
+       }
+       if m["msg"] == "response" {
+               if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+                       return false
+               }
+       }
+       return true
+}
index 55460af379b3338423d9692e9a12dacc103a0591..fdd4f27b7f9af5463517e3658020795c75ceb8d5 100644 (file)
@@ -5,7 +5,9 @@
 package crunchrun
 
 import (
+       "bytes"
        "fmt"
+       "io"
        "strings"
        "testing"
        "time"
@@ -13,6 +15,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        . "gopkg.in/check.v1"
+       check "gopkg.in/check.v1"
 )
 
 type LoggingTestSuite struct {
@@ -219,3 +222,34 @@ func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string
        c.Check(true, Equals, strings.Contains(stderrLog, expected))
        c.Check(string(kc.Content), Equals, logtext)
 }
+
+type filterSuite struct{}
+
+var _ = Suite(&filterSuite{})
+
+func (*filterSuite) TestFilterKeepstoreErrorsOnly(c *check.C) {
+       var buf bytes.Buffer
+       f := filterKeepstoreErrorsOnly{WriteCloser: nopCloser{&buf}}
+       for _, s := range []string{
+               "not j",
+               "son\n" + `{"msg":"foo"}` + "\n{}\n" + `{"msg":"request"}` + "\n" + `{"msg":1234}` + "\n\n",
+               "\n[\n",
+               `{"msg":"response","respStatusCode":404,"foo": "bar"}` + "\n",
+               `{"msg":"response","respStatusCode":206}` + "\n",
+       } {
+               f.Write([]byte(s))
+       }
+       c.Check(buf.String(), check.Equals, `not json
+{"msg":"foo"}
+{}
+{"msg":1234}
+[
+{"msg":"response","respStatusCode":404,"foo": "bar"}
+`)
+}
+
+type nopCloser struct {
+       io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
index b84e1eefaabfb154ed27478b9a39cc58bf629824..e736f79fd7f2bcee24e449596b4950bd279881bc 100644 (file)
@@ -436,6 +436,7 @@ type ContainersConfig struct {
        UsePreemptibleInstances     bool
        RuntimeEngine               string
        LocalKeepBlobBuffersPerVCPU int
+       LocalKeepLogsToContainerLog string
 
        JobsAPI struct {
                Enable         string